From 420c55b4b98f8e852ac0b02ea1d9469e3569f058 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 19 May 2021 11:37:57 +0200 Subject: [PATCH 01/23] Added the directory and the action for the experimental features e2e/conformance tests Signed-off-by: Francesco Guardiani --- .github/workflows/kind-e2e.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/kind-e2e.yaml b/.github/workflows/kind-e2e.yaml index b973e7ced1b..78d15a047e3 100644 --- a/.github/workflows/kind-e2e.yaml +++ b/.github/workflows/kind-e2e.yaml @@ -56,6 +56,8 @@ jobs: -brokers=eventing.knative.dev/v1:MTChannelBasedBroker -channels=messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1:ApiServerSource,sources.knative.dev/v1:ContainerSource,sources.knative.dev/v1beta2:PingSource + - test-suite: ./test/experimental + extra-test-flags: -tags=e2e_experimental env: GOPATH: ${{ github.workspace }} GO111MODULE: off From a610b086f1811016d436b420ea88213dea98105f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 19 May 2021 11:51:37 +0200 Subject: [PATCH 02/23] Removed custom build tag and using just e2e tag Fix e2e tests Signed-off-by: Francesco Guardiani --- .github/workflows/kind-e2e.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/kind-e2e.yaml b/.github/workflows/kind-e2e.yaml index 78d15a047e3..b973e7ced1b 100644 --- a/.github/workflows/kind-e2e.yaml +++ b/.github/workflows/kind-e2e.yaml @@ -56,8 +56,6 @@ jobs: -brokers=eventing.knative.dev/v1:MTChannelBasedBroker -channels=messaging.knative.dev/v1:Channel,messaging.knative.dev/v1:InMemoryChannel -sources=sources.knative.dev/v1:ApiServerSource,sources.knative.dev/v1:ContainerSource,sources.knative.dev/v1beta2:PingSource - - test-suite: ./test/experimental - extra-test-flags: -tags=e2e_experimental env: GOPATH: ${{ github.workspace }} GO111MODULE: off From aa4fc7932279f38beb47dfddf51df283b00b6844 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 18 May 2021 18:00:27 +0200 Subject: [PATCH 03/23] Progress Signed-off-by: Francesco Guardiani --- config/core/resources/subscription.yaml | 1 + .../messaging/v1/subscription_validation.go | 77 ++++++- .../v1/subscription_validation_test.go | 189 ++++++++++++++++++ pkg/reconciler/subscription/controller.go | 2 + pkg/reconciler/subscription/subscription.go | 18 ++ 5 files changed, 285 insertions(+), 2 deletions(-) diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml index 8e035f980e1..ce90004a2b1 100644 --- a/config/core/resources/subscription.yaml +++ b/config/core/resources/subscription.yaml @@ -127,6 +127,7 @@ spec: namespace: description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.' type: string + x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature uri: description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref. type: string diff --git a/pkg/apis/messaging/v1/subscription_validation.go b/pkg/apis/messaging/v1/subscription_validation.go index 93bcdf8c0ea..92487877c58 100644 --- a/pkg/apis/messaging/v1/subscription_validation.go +++ b/pkg/apis/messaging/v1/subscription_validation.go @@ -18,6 +18,7 @@ package v1 import ( "context" + "fmt" "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/api/equality" @@ -57,8 +58,16 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError { } if !missingSubscriber { - if fe := ss.Subscriber.Validate(ctx); fe != nil { - errs = errs.Also(fe.ViaField("subscriber")) + if true /* experimental feature enable */ { + // Special validation to enable the experimental feature. + // This logic is going to be moved back to Destination.Validate once the experimentation is done + if fe := validateDestinationWithGroupExperimentalFeatureEnabled(ctx, ss.Subscriber); fe != nil { + errs = errs.Also(fe.ViaField("subscriber")) + } + } else { + if fe := ss.Subscriber.Validate(ctx); fe != nil { + errs = errs.Also(fe.ViaField("subscriber")) + } } } @@ -97,3 +106,67 @@ func (s *Subscription) CheckImmutableFields(ctx context.Context, original *Subsc } return nil } + +func validateDestinationWithGroupExperimentalFeatureEnabled(ctx context.Context, dest *duckv1.Destination) *apis.FieldError { + ref := dest.Ref + uri := dest.URI + if ref == nil && uri == nil { + return apis.ErrGeneric("expected at least one, got none", "ref", "uri") + } + + if ref != nil && uri != nil && uri.URL().IsAbs() { + return apis.ErrGeneric("Absolute URI is not allowed when Ref or [apiVersion, kind, name] is present", "[apiVersion, kind, name]", "ref", "uri") + } + // IsAbs() check whether the URL has a non-empty scheme. Besides the non-empty scheme, we also require uri has a non-empty host + if ref == nil && uri != nil && (!uri.URL().IsAbs() || uri.Host == "") { + return apis.ErrInvalidValue("Relative URI is not allowed when Ref and [apiVersion, kind, name] is absent", "uri") + } + if ref != nil && uri == nil { + return ref.Validate(ctx).ViaField("ref") + } + return nil +} + +func validateKReferenceWithGroupExperimentalFeatureEnabled(ctx context.Context, kr *duckv1.KReference) *apis.FieldError { + var errs *apis.FieldError + if kr == nil { + return errs.Also(apis.ErrMissingField("name")). + Also(apis.ErrMissingField("apiVersion")). + Also(apis.ErrMissingField("kind")) + } + if kr.Name == "" { + errs = errs.Also(apis.ErrMissingField("name")) + } + if kr.APIVersion == "" && kr.Group == "" { + errs = errs.Also(apis.ErrMissingField("apiVersion")). + Also(apis.ErrMissingField("group")) + } + if kr.APIVersion != "" && kr.Group != "" { + errs = errs.Also(&apis.FieldError{ + Message: "both apiVersion and group are specified", + Paths: []string{"apiVersion", "group"}, + Details: "Only one of them must be specified", + }) + } + if kr.Kind == "" { + errs = errs.Also(apis.ErrMissingField("kind")) + } + // Only if namespace is empty validate it. This is to deal with legacy + // objects in the storage that may now have the namespace filled in. + // Because things get defaulted in other cases, moving forward the + // kr.Namespace will not be empty. + if kr.Namespace != "" { + if !apis.IsDifferentNamespaceAllowed(ctx) { + parentNS := apis.ParentMeta(ctx).Namespace + if parentNS != "" && kr.Namespace != parentNS { + errs = errs.Also(&apis.FieldError{ + Message: "mismatched namespaces", + Paths: []string{"namespace"}, + Details: fmt.Sprintf("parent namespace: %q does not match ref: %q", parentNS, kr.Namespace), + }) + } + } + } + + return errs +} diff --git a/pkg/apis/messaging/v1/subscription_validation_test.go b/pkg/apis/messaging/v1/subscription_validation_test.go index 9e2f3e3d2c3..c8781e5be48 100644 --- a/pkg/apis/messaging/v1/subscription_validation_test.go +++ b/pkg/apis/messaging/v1/subscription_validation_test.go @@ -243,6 +243,195 @@ func TestSubscriptionSpecValidation(t *testing.T) { } } +func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { + tests := []struct { + name string + c *SubscriptionSpec + want *apis.FieldError + }{{ + name: "valid", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + }, + want: nil, + }, { + name: "valid with reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: getValidReply(), + }, + want: nil, + }, { + name: "empty Channel", + c: &SubscriptionSpec{ + Channel: corev1.ObjectReference{}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channel") + fe.Details = "the Subscription must reference a channel" + return fe + }(), + }, { + name: "missing name in Channel", + c: &SubscriptionSpec{ + Channel: corev1.ObjectReference{ + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + Subscriber: getValidDestination(), + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channel.name") + return fe + }(), + }, { + name: "missing Subscriber and Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply", "subscriber") + fe.Details = "the Subscription must reference at least one of (reply or a subscriber)" + return fe + }(), + }, { + name: "empty Subscriber and Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: &duckv1.Destination{}, + Reply: &duckv1.Destination{}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply", "subscriber") + fe.Details = "the Subscription must reference at least one of (reply or a subscriber)" + return fe + }(), + }, { + name: "missing Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + }, + want: nil, + }, { + name: "empty Reply", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: &duckv1.Destination{}, + }, + want: nil, + }, { + name: "missing Subscriber", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Reply: getValidReply(), + }, + want: nil, + }, { + name: "empty Subscriber", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: &duckv1.Destination{}, + Reply: getValidReply(), + }, + want: nil, + }, { + name: "missing name in channel, and missing subscriber, reply", + c: &SubscriptionSpec{ + Channel: corev1.ObjectReference{ + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply", "subscriber") + fe.Details = "the Subscription must reference at least one of (reply or a subscriber)" + return apis.ErrMissingField("channel.name").Also(fe) + }(), + }, { + name: "empty", + c: &SubscriptionSpec{}, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channel") + fe.Details = "the Subscription must reference a channel" + return fe + }(), + }, { + name: "missing name in Subscriber.Ref", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Namespace: namespace, + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + }, + }, + want: apis.ErrMissingField("subscriber.ref.name"), + }, { + name: "missing name in Subscriber.Ref", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Namespace: namespace, + Name: "", + Kind: channelKind, + APIVersion: channelAPIVersion, + }, + }, + }, + want: apis.ErrMissingField("reply.ref.name"), + }, { + name: "both api version and group are empty", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Namespace: namespace, + Name: "abc", + Kind: channelKind, + }, + }, + }, + want: apis.ErrMissingField("reply.ref.apiVersion").Also(apis.ErrMissingField("reply.ref.group")), + }, { + name: "both api version and group are there", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Subscriber: getValidDestination(), + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Namespace: namespace, + Name: "abc", + Kind: channelKind, + Group: "myGroup", + }, + }, + }, + want: &apis.FieldError{ + Message: "both apiVersion and group are specified", + Paths: []string{"reply.ref.apiVersion", "reply.ref.group"}, + Details: "Only one of them must be specified", + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.c.Validate(context.TODO()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("%s: validateChannel (-want, +got) = %v", test.name, diff) + } + }) + } +} + func TestSubscriptionImmutable(t *testing.T) { newChannel := getValidChannelRef() newChannel.Name = "newChannel" diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 63e3931a98f..65737db8939 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -19,6 +19,7 @@ package subscription import ( "context" + "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" @@ -46,6 +47,7 @@ func NewController( r := &Reconciler{ dynamicClientSet: dynamicclient.Get(ctx), + crdLister: customresourcedefinition.Get(ctx).Lister(), subscriptionLister: subscriptionInformer.Lister(), channelLister: channelInformer.Lister(), } diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index d264cdaeedd..ab3514b323c 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -22,6 +22,8 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -62,6 +64,9 @@ type Reconciler struct { // DynamicClientSet allows us to configure pluggable Build objects dynamicClientSet dynamic.Interface + // crdLister is used to resolve the ref version + crdLister apiextensionsv1lister.CustomResourceDefinitionLister + // listers index properties about resources subscriptionLister listers.SubscriptionLister channelLister listers.ChannelLister @@ -201,6 +206,19 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub if subscriber.Ref != nil { subscriber.Ref.Namespace = subscription.Namespace } + + // Resolve the group + if subscriber.Ref != nil && true /* feature enabled */ { + err := subscriber.Ref.ResolveGroup(r.crdLister) + if err != nil { + logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref", + zap.Error(err), + zap.Any("subscriber", subscriber)) + subscription.Status.MarkReferencesNotResolved(subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %v", err) + return pkgreconciler.NewEvent(corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %w", err) + } + } + subscriberURI, err := r.destinationResolver.URIFromDestinationV1(ctx, *subscriber, subscription) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve Subscriber", From a1cfe8b08d7796884e1497c8e05ac6a5cac5db26 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 19 May 2021 09:57:11 +0200 Subject: [PATCH 04/23] Trying with my fork of pkg Signed-off-by: Francesco Guardiani --- go.mod | 2 + .../messaging/v1/subscription_validation.go | 79 ++----------------- .../v1/subscription_validation_test.go | 33 -------- 3 files changed, 7 insertions(+), 107 deletions(-) diff --git a/go.mod b/go.mod index af4930ef0df..cf13eaad9ed 100644 --- a/go.mod +++ b/go.mod @@ -48,3 +48,5 @@ require ( ) replace github.com/prometheus/client_golang => github.com/prometheus/client_golang v0.9.2 + +replace knative.dev/pkg => ../pkg diff --git a/pkg/apis/messaging/v1/subscription_validation.go b/pkg/apis/messaging/v1/subscription_validation.go index 92487877c58..5b8fbc5eadd 100644 --- a/pkg/apis/messaging/v1/subscription_validation.go +++ b/pkg/apis/messaging/v1/subscription_validation.go @@ -18,7 +18,6 @@ package v1 import ( "context" - "fmt" "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/api/equality" @@ -58,16 +57,12 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError { } if !missingSubscriber { + ctx := ctx if true /* experimental feature enable */ { - // Special validation to enable the experimental feature. - // This logic is going to be moved back to Destination.Validate once the experimentation is done - if fe := validateDestinationWithGroupExperimentalFeatureEnabled(ctx, ss.Subscriber); fe != nil { - errs = errs.Also(fe.ViaField("subscriber")) - } - } else { - if fe := ss.Subscriber.Validate(ctx); fe != nil { - errs = errs.Also(fe.ViaField("subscriber")) - } + ctx = duckv1.KReferenceGroupAllowed(ctx) + } + if fe := ss.Subscriber.Validate(ctx); fe != nil { + errs = errs.Also(fe.ViaField("subscriber")) } } @@ -106,67 +101,3 @@ func (s *Subscription) CheckImmutableFields(ctx context.Context, original *Subsc } return nil } - -func validateDestinationWithGroupExperimentalFeatureEnabled(ctx context.Context, dest *duckv1.Destination) *apis.FieldError { - ref := dest.Ref - uri := dest.URI - if ref == nil && uri == nil { - return apis.ErrGeneric("expected at least one, got none", "ref", "uri") - } - - if ref != nil && uri != nil && uri.URL().IsAbs() { - return apis.ErrGeneric("Absolute URI is not allowed when Ref or [apiVersion, kind, name] is present", "[apiVersion, kind, name]", "ref", "uri") - } - // IsAbs() check whether the URL has a non-empty scheme. Besides the non-empty scheme, we also require uri has a non-empty host - if ref == nil && uri != nil && (!uri.URL().IsAbs() || uri.Host == "") { - return apis.ErrInvalidValue("Relative URI is not allowed when Ref and [apiVersion, kind, name] is absent", "uri") - } - if ref != nil && uri == nil { - return ref.Validate(ctx).ViaField("ref") - } - return nil -} - -func validateKReferenceWithGroupExperimentalFeatureEnabled(ctx context.Context, kr *duckv1.KReference) *apis.FieldError { - var errs *apis.FieldError - if kr == nil { - return errs.Also(apis.ErrMissingField("name")). - Also(apis.ErrMissingField("apiVersion")). - Also(apis.ErrMissingField("kind")) - } - if kr.Name == "" { - errs = errs.Also(apis.ErrMissingField("name")) - } - if kr.APIVersion == "" && kr.Group == "" { - errs = errs.Also(apis.ErrMissingField("apiVersion")). - Also(apis.ErrMissingField("group")) - } - if kr.APIVersion != "" && kr.Group != "" { - errs = errs.Also(&apis.FieldError{ - Message: "both apiVersion and group are specified", - Paths: []string{"apiVersion", "group"}, - Details: "Only one of them must be specified", - }) - } - if kr.Kind == "" { - errs = errs.Also(apis.ErrMissingField("kind")) - } - // Only if namespace is empty validate it. This is to deal with legacy - // objects in the storage that may now have the namespace filled in. - // Because things get defaulted in other cases, moving forward the - // kr.Namespace will not be empty. - if kr.Namespace != "" { - if !apis.IsDifferentNamespaceAllowed(ctx) { - parentNS := apis.ParentMeta(ctx).Namespace - if parentNS != "" && kr.Namespace != parentNS { - errs = errs.Also(&apis.FieldError{ - Message: "mismatched namespaces", - Paths: []string{"namespace"}, - Details: fmt.Sprintf("parent namespace: %q does not match ref: %q", parentNS, kr.Namespace), - }) - } - } - } - - return errs -} diff --git a/pkg/apis/messaging/v1/subscription_validation_test.go b/pkg/apis/messaging/v1/subscription_validation_test.go index c8781e5be48..d0ea1b55af4 100644 --- a/pkg/apis/messaging/v1/subscription_validation_test.go +++ b/pkg/apis/messaging/v1/subscription_validation_test.go @@ -387,39 +387,6 @@ func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { }, }, want: apis.ErrMissingField("reply.ref.name"), - }, { - name: "both api version and group are empty", - c: &SubscriptionSpec{ - Channel: getValidChannelRef(), - Subscriber: getValidDestination(), - Reply: &duckv1.Destination{ - Ref: &duckv1.KReference{ - Namespace: namespace, - Name: "abc", - Kind: channelKind, - }, - }, - }, - want: apis.ErrMissingField("reply.ref.apiVersion").Also(apis.ErrMissingField("reply.ref.group")), - }, { - name: "both api version and group are there", - c: &SubscriptionSpec{ - Channel: getValidChannelRef(), - Subscriber: getValidDestination(), - Reply: &duckv1.Destination{ - Ref: &duckv1.KReference{ - Namespace: namespace, - Name: "abc", - Kind: channelKind, - Group: "myGroup", - }, - }, - }, - want: &apis.FieldError{ - Message: "both apiVersion and group are specified", - Paths: []string{"reply.ref.apiVersion", "reply.ref.group"}, - Details: "Only one of them must be specified", - }, }} for _, test := range tests { From 0900ce12305ed7dc23f0f8f379f30341a207a09e Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 19 May 2021 11:09:42 +0200 Subject: [PATCH 05/23] Some todos Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/subscription.go | 3 +- .../subscription/subscription_test.go | 80 +++++++++++++++++++ pkg/reconciler/testing/v1/subscription.go | 13 +++ 3 files changed, 94 insertions(+), 2 deletions(-) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index ab3514b323c..a043cd08ece 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -22,7 +22,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -208,7 +207,7 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub } // Resolve the group - if subscriber.Ref != nil && true /* feature enabled */ { + if subscriber.Ref != nil && true /* TODO feature enabled */ { err := subscriber.Ref.ResolveGroup(r.crdLister) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref", diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 5c887763cab..703e5f07c1a 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -23,6 +23,7 @@ import ( "testing" "k8s.io/utils/pointer" + "knative.dev/eventing/pkg/apis/messaging" "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/network" @@ -46,6 +47,7 @@ import ( messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" "knative.dev/eventing/pkg/duck" + eventingtesting "knative.dev/eventing/pkg/reconciler/testing" . "knative.dev/pkg/reconciler/testing" @@ -205,6 +207,83 @@ func TestAllCases(t *testing.T) { MarkSubscriptionReady, ), }}, + }, { + Name: "subscription goes ready without api version", + Ctx: context.TODO(), // TODO add context + Objects: []runtime.Object{ + NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + MarkReferencesResolved, + MarkAddedToChannel, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + ), + // Subscriber + NewUnstructured(subscriberGVK, subscriberName, testNS, + WithUnstructuredAddressable(subscriberDNS), + ), + // Reply + NewInMemoryChannel(replyName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelAddress(replyDNS), + ), + // Channel + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelReady(channelDNS), + WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{ + UID: subscriptionUID, + Generation: 0, + SubscriberURI: subscriberURI, + ReplyURI: replyURI, + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: apis.HTTP("call2"), + ReplyURI: apis.HTTP("sink2"), + }}), + WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{ + UID: subscriptionUID, + ObservedGeneration: 0, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }}), + ), + // IMC CRD + eventingtesting.NewCustomResourceDefinition(messaging.InMemoryChannelsResource.String(), + eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1beta1", + Storage: false, + }, { + Name: "v1", + Storage: true, + }}), + ), + }, + Key: testNS + "/" + subscriptionName, + WantErr: false, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewSubscription(subscriptionName, testNS, + WithSubscriptionUID(subscriptionUID), + WithSubscriptionChannel(imcV1GVK, channelName), + WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionReply(imcV1GVK, replyName, testNS), + WithInitSubscriptionConditions, + WithSubscriptionFinalizers(finalizerName), + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), + WithSubscriptionPhysicalSubscriptionReply(replyURI), + // - Status Update - + MarkSubscriptionReady, + ), + }}, }, { Name: "channel does not exist", Objects: []runtime.Object{ @@ -1368,6 +1447,7 @@ func TestAllCases(t *testing.T) { channelLister: listers.GetMessagingChannelLister(), channelableTracker: duck.NewListableTracker(ctx, channelable.Get, func(types.NamespacedName) {}, 0), destinationResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), + crdLister: listers.GetCustomResourceDefinitionLister(), tracker: &FakeTracker{}, } return subscription.NewReconciler(ctx, logger, diff --git a/pkg/reconciler/testing/v1/subscription.go b/pkg/reconciler/testing/v1/subscription.go index b4dd4083a35..2d665b254f9 100644 --- a/pkg/reconciler/testing/v1/subscription.go +++ b/pkg/reconciler/testing/v1/subscription.go @@ -138,6 +138,19 @@ func WithSubscriptionSubscriberRef(gvk metav1.GroupVersionKind, name, namespace } } +func WithSubscriptionSubscriberRefUsingGroup(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { + return func(s *v1.Subscription) { + s.Spec.Subscriber = &duckv1.Destination{ + Ref: &duckv1.KReference{ + Group: gvk.Group, + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + } + } +} + func WithSubscriptionDeliveryRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption { return func(s *v1.Subscription) { s.Spec.Delivery = &eventingduckv1.DeliverySpec{ From e0e833341a0cd2e0ee2ff2d313de935cf74c2ce2 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 24 May 2021 10:10:11 +0200 Subject: [PATCH 06/23] WIP Signed-off-by: Francesco Guardiani --- pkg/apis/feature/flag_names.go | 6 +- .../messaging/v1/subscription_validation.go | 3 +- pkg/reconciler/subscription/subscription.go | 3 +- .../kreference_group/channel_to_channel.go | 78 +++++++++++++++++++ 4 files changed, 87 insertions(+), 3 deletions(-) create mode 100644 test/experimental/features/kreference_group/channel_to_channel.go diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go index 5d7b554ea45..02f87d768c5 100644 --- a/pkg/apis/feature/flag_names.go +++ b/pkg/apis/feature/flag_names.go @@ -14,4 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. */ -package feature +package experimental + +const ( + KReferenceGroup = "kreference-group" +) diff --git a/pkg/apis/messaging/v1/subscription_validation.go b/pkg/apis/messaging/v1/subscription_validation.go index 5b8fbc5eadd..bdbfcdc556e 100644 --- a/pkg/apis/messaging/v1/subscription_validation.go +++ b/pkg/apis/messaging/v1/subscription_validation.go @@ -21,6 +21,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/api/equality" + "knative.dev/eventing/pkg/apis/experimental" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmp" @@ -58,7 +59,7 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError { if !missingSubscriber { ctx := ctx - if true /* experimental feature enable */ { + if experimental.FromContext(ctx).IsEnabled(experimental.KReferenceGroup) { ctx = duckv1.KReferenceGroupAllowed(ctx) } if fe := ss.Subscriber.Validate(ctx); fe != nil { diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index a043cd08ece..8d5513ba787 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" + "knative.dev/eventing/pkg/apis/experimental" "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -207,7 +208,7 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub } // Resolve the group - if subscriber.Ref != nil && true /* TODO feature enabled */ { + if subscriber.Ref != nil && experimental.FromContext(ctx).IsEnabled(experimental.KReferenceGroup) { err := subscriber.Ref.ResolveGroup(r.crdLister) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref", diff --git a/test/experimental/features/kreference_group/channel_to_channel.go b/test/experimental/features/kreference_group/channel_to_channel.go new file mode 100644 index 00000000000..9a52effc9e6 --- /dev/null +++ b/test/experimental/features/kreference_group/channel_to_channel.go @@ -0,0 +1,78 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kreference_group + +import ( + "context" + + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/feature" +) + +// ChannelToChannel tests a scenario where the flow is source -> A -> B -> sink +// and A -> B subscription uses the KReference.Group field. +func ChannelToChannel() *feature.Feature { + f := feature.NewFeature() + + channelAName := feature.MakeRandomK8sName("channel-a") + channelBName := feature.MakeRandomK8sName("channel-b") + subAToBName := feature.MakeRandomK8sName("sub-a-b") + subBToSinkName := feature.MakeRandomK8sName("sub-b-sink") + sinkName := feature.MakeRandomK8sName("sink") + sourceName := feature.MakeRandomK8sName("source") + + f.Setup("install sink", eventshub.Install( + sinkName, + eventshub.StartReceiver, + )) + + f.Setup("Install channel A", installInMemoryChannel(channelAName)) + f.Setup("Install channel B", installInMemoryChannel(channelBName)) + f.Setup("Install channel B -> sink subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, + &messagingv1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subAToBName, + Namespace: namespace, + }, + Spec: messagingv1.SubscriptionSpec{ + Channel: + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) +} + +func installInMemoryChannel(channelName string) feature.StepFn { + return func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().InMemoryChannels(namespace).Create(ctx, + &messagingv1.InMemoryChannel{ + ObjectMeta: metav1.ObjectMeta{ + Name: channelName, + Namespace: namespace, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + } +} \ No newline at end of file From d955c7dd998db1e0f95b50d351b8b946b56b59e5 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 24 May 2021 10:50:17 +0200 Subject: [PATCH 07/23] Added test Signed-off-by: Francesco Guardiani --- .../kreference_group/channel_to_channel.go | 72 ++++++++++++++++++- test/experimental/kreference_group_test.go | 43 +++++++++++ 2 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 test/experimental/kreference_group_test.go diff --git a/test/experimental/features/kreference_group/channel_to_channel.go b/test/experimental/features/kreference_group/channel_to_channel.go index 9a52effc9e6..5823f7d2f3d 100644 --- a/test/experimental/features/kreference_group/channel_to_channel.go +++ b/test/experimental/features/kreference_group/channel_to_channel.go @@ -19,12 +19,18 @@ package kreference_group import ( "context" + cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" + "knative.dev/eventing/test/rekt/resources/channel" + "knative.dev/eventing/test/rekt/resources/subscription" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" + "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" ) @@ -33,6 +39,11 @@ import ( func ChannelToChannel() *feature.Feature { f := feature.NewFeature() + imcGVK := (&messagingv1.InMemoryChannel{}).GetGroupVersionKind() + imcGVR, _ := meta.UnsafeGuessKindToResource(imcGVK) + imcGroup := imcGVK.GroupKind().Group + imcAPIVersion, imcKind := imcGVK.ToAPIVersionAndKind() + channelAName := feature.MakeRandomK8sName("channel-a") channelBName := feature.MakeRandomK8sName("channel-b") subAToBName := feature.MakeRandomK8sName("sub-a-b") @@ -40,6 +51,8 @@ func ChannelToChannel() *feature.Feature { sinkName := feature.MakeRandomK8sName("sink") sourceName := feature.MakeRandomK8sName("source") + ev := cetest.FullEvent() + f.Setup("install sink", eventshub.Install( sinkName, eventshub.StartReceiver, @@ -47,7 +60,7 @@ func ChannelToChannel() *feature.Feature { f.Setup("Install channel A", installInMemoryChannel(channelAName)) f.Setup("Install channel B", installInMemoryChannel(channelBName)) - f.Setup("Install channel B -> sink subscription", func(ctx context.Context, t feature.T) { + f.Setup("Install channel A -> channel B subscription", func(ctx context.Context, t feature.T) { namespace := environment.FromContext(ctx).Namespace() _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, &messagingv1.Subscription{ @@ -56,11 +69,66 @@ func ChannelToChannel() *feature.Feature { Namespace: namespace, }, Spec: messagingv1.SubscriptionSpec{ - Channel: + Channel: duckv1.KReference{ + APIVersion: imcAPIVersion, + Kind: imcKind, + Namespace: namespace, + Name: channelAName, + }, + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Group: imcGroup, + Kind: imcKind, + Namespace: namespace, + Name: channelBName, + }, + }, + }, + }, metav1.CreateOptions{}) + require.NoError(t, err) + }) + f.Setup("Install channel B -> sink subscription", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, + &messagingv1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subBToSinkName, + Namespace: namespace, + }, + Spec: messagingv1.SubscriptionSpec{ + Channel: duckv1.KReference{ + APIVersion: imcAPIVersion, + Kind: imcKind, + Namespace: namespace, + Name: channelBName, + }, + Subscriber: &duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "Service", + Namespace: namespace, + Name: sinkName, + }, + }, }, }, metav1.CreateOptions{}) require.NoError(t, err) }) + + f.Setup("channel A is ready", channel.IsReady(channelAName)) + f.Setup("channel B is ready", channel.IsReady(channelBName)) + f.Setup("subscription A -> B is ready", subscription.IsReady(subAToBName)) + f.Setup("subscription B -> Sink is ready", subscription.IsReady(subBToSinkName)) + + f.Setup("install source", eventshub.Install( + sourceName, + eventshub.StartSenderToResource(imcGVR, channelAName), + eventshub.InputEvent(ev), + )) + + f.Assert("receive event", assert.OnStore(sinkName).MatchEvent(cetest.IsEqualTo(ev)).Exact(1)) + + return f } func installInMemoryChannel(channelName string) feature.StepFn { diff --git a/test/experimental/kreference_group_test.go b/test/experimental/kreference_group_test.go new file mode 100644 index 00000000000..99254616243 --- /dev/null +++ b/test/experimental/kreference_group_test.go @@ -0,0 +1,43 @@ +// +build e2e + +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package experimental + +import ( + "testing" + + "knative.dev/eventing/test/experimental/features/kreference_group" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" +) + +func TestChannelToChannel(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, kreference_group.ChannelToChannel()) +} From 54c097f69ac6a4ab2f6d6d4135d1facfd3e7ecf7 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 27 May 2021 10:39:07 +0200 Subject: [PATCH 08/23] Some fixes Signed-off-by: Francesco Guardiani --- .../messaging/v1/subscription_validation_test.go | 12 ++++++++---- pkg/reconciler/subscription/controller_test.go | 1 + pkg/reconciler/subscription/subscription_test.go | 10 ++++++---- pkg/reconciler/testing/v1/factory.go | 7 ++++++- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/pkg/apis/messaging/v1/subscription_validation_test.go b/pkg/apis/messaging/v1/subscription_validation_test.go index d0ea1b55af4..b4c819614b6 100644 --- a/pkg/apis/messaging/v1/subscription_validation_test.go +++ b/pkg/apis/messaging/v1/subscription_validation_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "knative.dev/eventing/pkg/apis/experimental" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -266,7 +267,7 @@ func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { }, { name: "empty Channel", c: &SubscriptionSpec{ - Channel: corev1.ObjectReference{}, + Channel: duckv1.KReference{}, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("channel") @@ -276,7 +277,7 @@ func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { }, { name: "missing name in Channel", c: &SubscriptionSpec{ - Channel: corev1.ObjectReference{ + Channel: duckv1.KReference{ Kind: channelKind, APIVersion: channelAPIVersion, }, @@ -341,7 +342,7 @@ func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { }, { name: "missing name in channel, and missing subscriber, reply", c: &SubscriptionSpec{ - Channel: corev1.ObjectReference{ + Channel: duckv1.KReference{ Kind: channelKind, APIVersion: channelAPIVersion, }, @@ -391,7 +392,10 @@ func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got := test.c.Validate(context.TODO()) + ctx := experimental.ToContext(context.TODO(), experimental.Flags{ + experimental.KReferenceGroup: true, + }) + got := test.c.Validate(ctx) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { t.Errorf("%s: validateChannel (-want, +got) = %v", test.name, diff) } diff --git a/pkg/reconciler/subscription/controller_test.go b/pkg/reconciler/subscription/controller_test.go index 01d961ce950..1addb7fcf26 100644 --- a/pkg/reconciler/subscription/controller_test.go +++ b/pkg/reconciler/subscription/controller_test.go @@ -26,6 +26,7 @@ import ( _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" + _ "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition/fake" _ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake" ) diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 703e5f07c1a..f90b9da435c 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -23,7 +23,7 @@ import ( "testing" "k8s.io/utils/pointer" - "knative.dev/eventing/pkg/apis/messaging" + "knative.dev/eventing/pkg/apis/experimental" "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/network" @@ -209,7 +209,9 @@ func TestAllCases(t *testing.T) { }}, }, { Name: "subscription goes ready without api version", - Ctx: context.TODO(), // TODO add context + Ctx: experimental.ToContext(context.TODO(), experimental.Flags{ + experimental.KReferenceGroup: true, + }), Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), @@ -258,7 +260,7 @@ func TestAllCases(t *testing.T) { }}), ), // IMC CRD - eventingtesting.NewCustomResourceDefinition(messaging.InMemoryChannelsResource.String(), + eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev", eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{ Name: "v1beta1", Storage: false, @@ -274,7 +276,7 @@ func TestAllCases(t *testing.T) { Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), WithSubscriptionChannel(imcV1GVK, channelName), - WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS), + WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS), WithSubscriptionReply(imcV1GVK, replyName, testNS), WithInitSubscriptionConditions, WithSubscriptionFinalizers(finalizerName), diff --git a/pkg/reconciler/testing/v1/factory.go b/pkg/reconciler/testing/v1/factory.go index d384bfd62b4..753d0644d69 100644 --- a/pkg/reconciler/testing/v1/factory.go +++ b/pkg/reconciler/testing/v1/factory.go @@ -58,7 +58,12 @@ func MakeFactory(ctor Ctor, unstructured bool, logger *zap.SugaredLogger) Factor return func(t *testing.T, r *TableRow) (controller.Reconciler, ActionRecorderList, EventList) { ls := NewListers(r.Objects) - ctx := context.Background() + var ctx context.Context + if r.Ctx != nil { + ctx = r.Ctx + } else { + ctx = context.Background() + } ctx = logging.WithLogger(ctx, logger) ctx, kubeClient := fakekubeclient.With(ctx, ls.GetKubeObjects()...) From 867ea83bc12eaac55624205783c8eb7faed19afa Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Thu, 27 May 2021 11:45:55 +0200 Subject: [PATCH 09/23] Now everything works Signed-off-by: Francesco Guardiani --- cmd/webhook/main.go | 17 ++++- config/core/configmaps/features.yaml | 2 + pkg/reconciler/subscription/controller.go | 13 ++-- pkg/reconciler/subscription/subscription.go | 6 ++ .../kreference_group/channel_to_channel.go | 66 +++++++------------ 5 files changed, 55 insertions(+), 49 deletions(-) diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 5bb6e370b75..e5abe435c32 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing/pkg/apis/experimental" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -91,9 +92,12 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store")) channelStore.WatchConfigs(cmw) + experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) + experimentalStore.WatchConfigs(cmw) + // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return channelStore.ToContext(store.ToContext(ctx)) + return experimentalStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) } return defaulting.NewAdmissionController(ctx, @@ -125,9 +129,13 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher pingstore := pingdefaultconfig.NewStore(logging.FromContext(ctx).Named("ping-config-store")) pingstore.WatchConfigs(cmw) + + experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) + experimentalStore.WatchConfigs(cmw) + // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx))) + return experimentalStore.ToContext(channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx)))) } return validation.NewAdmissionController(ctx, @@ -201,9 +209,12 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store")) channelStore.WatchConfigs(cmw) + experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) + experimentalStore.WatchConfigs(cmw) + // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return channelStore.ToContext(store.ToContext(ctx)) + return experimentalStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) } var ( diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index 36fc042ffd7..ec77d7eaa5d 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -22,3 +22,5 @@ metadata: knative.dev/config-propagation: original knative.dev/config-category: eventing data: + # ALPHA feature: The kreference-group allows you to use the Group field in KReferences. + kreference-group: "false" \ No newline at end of file diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 65737db8939..428d8494ce9 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -19,6 +19,7 @@ package subscription import ( "context" + "knative.dev/eventing/pkg/apis/experimental" "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -45,11 +46,15 @@ func NewController( subscriptionInformer := subscription.Get(ctx) channelInformer := channel.Get(ctx) + experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) + experimentalStore.WatchConfigs(cmw) + r := &Reconciler{ - dynamicClientSet: dynamicclient.Get(ctx), - crdLister: customresourcedefinition.Get(ctx).Lister(), - subscriptionLister: subscriptionInformer.Lister(), - channelLister: channelInformer.Lister(), + dynamicClientSet: dynamicclient.Get(ctx), + crdLister: customresourcedefinition.Get(ctx).Lister(), + subscriptionLister: subscriptionInformer.Lister(), + channelLister: channelInformer.Lister(), + experimentalFlagsStore: experimentalStore, } impl := subscriptionreconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 8d5513ba787..8a4b8638eaf 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -73,6 +73,8 @@ type Reconciler struct { channelableTracker eventingduck.ListableTracker destinationResolver *resolver.URIResolver tracker tracker.Interface + + experimentalFlagsStore *experimental.Store } // Check that our Reconciler implements Interface @@ -83,6 +85,9 @@ var _ subscriptionreconciler.Finalizer = (*Reconciler)(nil) // ReconcileKind implements Interface.ReconcileKind. func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { + // Populate this context with experimental flags + ctx = r.experimentalFlagsStore.ToContext(ctx) + // Find the channel for this subscription. channel, err := r.getChannel(ctx, subscription) if err != nil { @@ -217,6 +222,7 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub subscription.Status.MarkReferencesNotResolved(subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %v", err) return pkgreconciler.NewEvent(corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %w", err) } + logging.FromContext(ctx).Debugw("Group resolved", zap.Any("spec.subscriber.ref", subscriber.Ref)) } subscriberURI, err := r.destinationResolver.URIFromDestinationV1(ctx, *subscriber, subscription) diff --git a/test/experimental/features/kreference_group/channel_to_channel.go b/test/experimental/features/kreference_group/channel_to_channel.go index 5823f7d2f3d..31668571d6e 100644 --- a/test/experimental/features/kreference_group/channel_to_channel.go +++ b/test/experimental/features/kreference_group/channel_to_channel.go @@ -21,7 +21,6 @@ import ( cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/stretchr/testify/require" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" eventingclient "knative.dev/eventing/pkg/client/injection/client" @@ -39,10 +38,10 @@ import ( func ChannelToChannel() *feature.Feature { f := feature.NewFeature() - imcGVK := (&messagingv1.InMemoryChannel{}).GetGroupVersionKind() - imcGVR, _ := meta.UnsafeGuessKindToResource(imcGVK) - imcGroup := imcGVK.GroupKind().Group - imcAPIVersion, imcKind := imcGVK.ToAPIVersionAndKind() + channelGVK := channel.GVK() + channelGVR := channel.GVR() + channelGroup := channelGVK.GroupKind().Group + channelAPIVersion, imcKind := channelGVK.ToAPIVersionAndKind() channelAName := feature.MakeRandomK8sName("channel-a") channelBName := feature.MakeRandomK8sName("channel-b") @@ -58,29 +57,30 @@ func ChannelToChannel() *feature.Feature { eventshub.StartReceiver, )) - f.Setup("Install channel A", installInMemoryChannel(channelAName)) - f.Setup("Install channel B", installInMemoryChannel(channelBName)) + f.Setup("Install channel A", channel.Install(channelAName)) + f.Setup("Install channel B", channel.Install(channelBName)) + f.Setup("channel A is ready", channel.IsReady(channelAName)) + f.Setup("channel B is ready", channel.IsReady(channelBName)) + f.Setup("Install channel A -> channel B subscription", func(ctx context.Context, t feature.T) { namespace := environment.FromContext(ctx).Namespace() _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, &messagingv1.Subscription{ ObjectMeta: metav1.ObjectMeta{ - Name: subAToBName, + Name: subAToBName, Namespace: namespace, }, Spec: messagingv1.SubscriptionSpec{ Channel: duckv1.KReference{ - APIVersion: imcAPIVersion, - Kind: imcKind, - Namespace: namespace, - Name: channelAName, + APIVersion: channelAPIVersion, + Kind: imcKind, + Name: channelAName, }, Subscriber: &duckv1.Destination{ Ref: &duckv1.KReference{ - Group: imcGroup, - Kind: imcKind, - Namespace: namespace, - Name: channelBName, + Group: channelGroup, + Kind: imcKind, + Name: channelBName, }, }, }, @@ -92,22 +92,20 @@ func ChannelToChannel() *feature.Feature { _, err := eventingclient.Get(ctx).MessagingV1().Subscriptions(namespace).Create(ctx, &messagingv1.Subscription{ ObjectMeta: metav1.ObjectMeta{ - Name: subBToSinkName, + Name: subBToSinkName, Namespace: namespace, }, Spec: messagingv1.SubscriptionSpec{ Channel: duckv1.KReference{ - APIVersion: imcAPIVersion, - Kind: imcKind, - Namespace: namespace, - Name: channelBName, + APIVersion: channelAPIVersion, + Kind: imcKind, + Name: channelBName, }, Subscriber: &duckv1.Destination{ Ref: &duckv1.KReference{ APIVersion: "v1", - Kind: "Service", - Namespace: namespace, - Name: sinkName, + Kind: "Service", + Name: sinkName, }, }, }, @@ -115,32 +113,16 @@ func ChannelToChannel() *feature.Feature { require.NoError(t, err) }) - f.Setup("channel A is ready", channel.IsReady(channelAName)) - f.Setup("channel B is ready", channel.IsReady(channelBName)) f.Setup("subscription A -> B is ready", subscription.IsReady(subAToBName)) f.Setup("subscription B -> Sink is ready", subscription.IsReady(subBToSinkName)) f.Setup("install source", eventshub.Install( sourceName, - eventshub.StartSenderToResource(imcGVR, channelAName), + eventshub.StartSenderToResource(channelGVR, channelAName), eventshub.InputEvent(ev), )) - f.Assert("receive event", assert.OnStore(sinkName).MatchEvent(cetest.IsEqualTo(ev)).Exact(1)) + f.Assert("receive event", assert.OnStore(sinkName).MatchEvent(cetest.HasId(ev.ID())).Exact(1)) return f } - -func installInMemoryChannel(channelName string) feature.StepFn { - return func(ctx context.Context, t feature.T) { - namespace := environment.FromContext(ctx).Namespace() - _, err := eventingclient.Get(ctx).MessagingV1().InMemoryChannels(namespace).Create(ctx, - &messagingv1.InMemoryChannel{ - ObjectMeta: metav1.ObjectMeta{ - Name: channelName, - Namespace: namespace, - }, - }, metav1.CreateOptions{}) - require.NoError(t, err) - } -} \ No newline at end of file From 022e11d2b918004d5db2cf2e7291262b0c78239b Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 1 Jun 2021 11:17:21 +0200 Subject: [PATCH 10/23] Rebase Signed-off-by: Francesco Guardiani --- cmd/webhook/main.go | 20 +++++++++---------- pkg/apis/feature/flag_names.go | 2 +- .../messaging/v1/subscription_validation.go | 4 ++-- .../v1/subscription_validation_test.go | 6 +++--- pkg/reconciler/subscription/controller.go | 16 +++++++-------- pkg/reconciler/subscription/subscription.go | 8 ++++---- .../subscription/subscription_test.go | 6 +++--- 7 files changed, 31 insertions(+), 31 deletions(-) diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index e5abe435c32..4c2a4a8035b 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -22,7 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/apis/experimental" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -92,12 +92,12 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store")) channelStore.WatchConfigs(cmw) - experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) - experimentalStore.WatchConfigs(cmw) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return experimentalStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) + return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) } return defaulting.NewAdmissionController(ctx, @@ -130,12 +130,12 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher pingstore := pingdefaultconfig.NewStore(logging.FromContext(ctx).Named("ping-config-store")) pingstore.WatchConfigs(cmw) - experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) - experimentalStore.WatchConfigs(cmw) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return experimentalStore.ToContext(channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx)))) + return featureStore.ToContext(channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx)))) } return validation.NewAdmissionController(ctx, @@ -209,12 +209,12 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store")) channelStore.WatchConfigs(cmw) - experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) - experimentalStore.WatchConfigs(cmw) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) // Decorate contexts with the current state of the config. ctxFunc := func(ctx context.Context) context.Context { - return experimentalStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) + return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx))) } var ( diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go index 02f87d768c5..fdc49d7e144 100644 --- a/pkg/apis/feature/flag_names.go +++ b/pkg/apis/feature/flag_names.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package experimental +package feature const ( KReferenceGroup = "kreference-group" diff --git a/pkg/apis/messaging/v1/subscription_validation.go b/pkg/apis/messaging/v1/subscription_validation.go index bdbfcdc556e..5f70612c2f4 100644 --- a/pkg/apis/messaging/v1/subscription_validation.go +++ b/pkg/apis/messaging/v1/subscription_validation.go @@ -21,7 +21,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/api/equality" - "knative.dev/eventing/pkg/apis/experimental" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmp" @@ -59,7 +59,7 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError { if !missingSubscriber { ctx := ctx - if experimental.FromContext(ctx).IsEnabled(experimental.KReferenceGroup) { + if feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { ctx = duckv1.KReferenceGroupAllowed(ctx) } if fe := ss.Subscriber.Validate(ctx); fe != nil { diff --git a/pkg/apis/messaging/v1/subscription_validation_test.go b/pkg/apis/messaging/v1/subscription_validation_test.go index b4c819614b6..e2871f71a93 100644 --- a/pkg/apis/messaging/v1/subscription_validation_test.go +++ b/pkg/apis/messaging/v1/subscription_validation_test.go @@ -20,7 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" - "knative.dev/eventing/pkg/apis/experimental" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -392,8 +392,8 @@ func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ctx := experimental.ToContext(context.TODO(), experimental.Flags{ - experimental.KReferenceGroup: true, + ctx := feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Allowed, }) got := test.c.Validate(ctx) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 428d8494ce9..6e501e1e279 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -19,7 +19,7 @@ package subscription import ( "context" - "knative.dev/eventing/pkg/apis/experimental" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -46,15 +46,15 @@ func NewController( subscriptionInformer := subscription.Get(ctx) channelInformer := channel.Get(ctx) - experimentalStore := experimental.NewStore(logging.FromContext(ctx).Named("experimental-config-store")) - experimentalStore.WatchConfigs(cmw) + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + featureStore.WatchConfigs(cmw) r := &Reconciler{ - dynamicClientSet: dynamicclient.Get(ctx), - crdLister: customresourcedefinition.Get(ctx).Lister(), - subscriptionLister: subscriptionInformer.Lister(), - channelLister: channelInformer.Lister(), - experimentalFlagsStore: experimentalStore, + dynamicClientSet: dynamicclient.Get(ctx), + crdLister: customresourcedefinition.Get(ctx).Lister(), + subscriptionLister: subscriptionInformer.Lister(), + channelLister: channelInformer.Lister(), + featureFlagsStore: featureStore, } impl := subscriptionreconciler.NewImpl(ctx, r) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 8a4b8638eaf..5b25b4500c3 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -29,7 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "knative.dev/eventing/pkg/apis/experimental" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -74,7 +74,7 @@ type Reconciler struct { destinationResolver *resolver.URIResolver tracker tracker.Interface - experimentalFlagsStore *experimental.Store + featureFlagsStore *feature.Store } // Check that our Reconciler implements Interface @@ -86,7 +86,7 @@ var _ subscriptionreconciler.Finalizer = (*Reconciler)(nil) // ReconcileKind implements Interface.ReconcileKind. func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { // Populate this context with experimental flags - ctx = r.experimentalFlagsStore.ToContext(ctx) + ctx = r.featureFlagsStore.ToContext(ctx) // Find the channel for this subscription. channel, err := r.getChannel(ctx, subscription) @@ -213,7 +213,7 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub } // Resolve the group - if subscriber.Ref != nil && experimental.FromContext(ctx).IsEnabled(experimental.KReferenceGroup) { + if subscriber.Ref != nil && feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { err := subscriber.Ref.ResolveGroup(r.crdLister) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref", diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index f90b9da435c..e9143e63939 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -23,7 +23,7 @@ import ( "testing" "k8s.io/utils/pointer" - "knative.dev/eventing/pkg/apis/experimental" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/network" @@ -209,8 +209,8 @@ func TestAllCases(t *testing.T) { }}, }, { Name: "subscription goes ready without api version", - Ctx: experimental.ToContext(context.TODO(), experimental.Flags{ - experimental.KReferenceGroup: true, + Ctx: feature.ToContext(context.TODO(), feature.Flags{ + feature.KReferenceGroup: feature.Allowed, }), Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, From 220daa24cdc20ff2a189a0abd770fa8607808bf8 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 8 Jun 2021 08:29:29 +0200 Subject: [PATCH 11/23] Fix leftover Signed-off-by: Francesco Guardiani --- config/core/configmaps/features.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index ec77d7eaa5d..a0f8c8ff734 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -23,4 +23,4 @@ metadata: knative.dev/config-category: eventing data: # ALPHA feature: The kreference-group allows you to use the Group field in KReferences. - kreference-group: "false" \ No newline at end of file + kreference-group: "disabled" \ No newline at end of file From 1f4240bd105f422fd9b31bf3a0c548dd52410586 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 8 Jun 2021 08:54:03 +0200 Subject: [PATCH 12/23] Changes from pkg Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/subscription.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 5b25b4500c3..44f925db6de 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -33,6 +33,7 @@ import ( "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/kref" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/pkg/resolver" @@ -214,7 +215,8 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub // Resolve the group if subscriber.Ref != nil && feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { - err := subscriber.Ref.ResolveGroup(r.crdLister) + var err error + subscriber.Ref, err = kref.ResolveGroup(subscriber.Ref, r.crdLister) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref", zap.Error(err), From 01c3a69120c893bf8f0626c0ebbff4c2f19a4bb5 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 8 Jun 2021 09:39:36 +0200 Subject: [PATCH 13/23] Using configStore Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/controller.go | 7 +++++-- pkg/reconciler/subscription/subscription.go | 7 +------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 6e501e1e279..7cbfb690bd8 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -54,9 +54,12 @@ func NewController( crdLister: customresourcedefinition.Get(ctx).Lister(), subscriptionLister: subscriptionInformer.Lister(), channelLister: channelInformer.Lister(), - featureFlagsStore: featureStore, } - impl := subscriptionreconciler.NewImpl(ctx, r) + impl := subscriptionreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) logging.FromContext(ctx).Info("Setting up event handlers") subscriptionInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 44f925db6de..e9062ca67b1 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" - "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis/duck" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -40,6 +39,7 @@ import ( "knative.dev/pkg/tracker" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/messaging/v1" subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription" listers "knative.dev/eventing/pkg/client/listers/messaging/v1" @@ -74,8 +74,6 @@ type Reconciler struct { channelableTracker eventingduck.ListableTracker destinationResolver *resolver.URIResolver tracker tracker.Interface - - featureFlagsStore *feature.Store } // Check that our Reconciler implements Interface @@ -86,9 +84,6 @@ var _ subscriptionreconciler.Finalizer = (*Reconciler)(nil) // ReconcileKind implements Interface.ReconcileKind. func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event { - // Populate this context with experimental flags - ctx = r.featureFlagsStore.ToContext(ctx) - // Find the channel for this subscription. channel, err := r.getChannel(ctx, subscription) if err != nil { From 79847bf31557e7e81c917ea2fcd805188e2555f0 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 8 Jun 2021 10:10:01 +0200 Subject: [PATCH 14/23] Fill the context with proper context decorators of specific features Signed-off-by: Francesco Guardiani --- pkg/apis/feature/store.go | 13 ++++++++++++- pkg/apis/messaging/v1/subscription_validation.go | 5 ----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/apis/feature/store.go b/pkg/apis/feature/store.go index cfa78baf9fb..22a65e7d981 100644 --- a/pkg/apis/feature/store.go +++ b/pkg/apis/feature/store.go @@ -19,6 +19,7 @@ package feature import ( "context" + duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/configmap" ) @@ -77,7 +78,10 @@ func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value i // ToContext attaches the current Config state to the provided context. func (s *Store) ToContext(ctx context.Context) context.Context { - return ToContext(ctx, s.Load()) + flags := s.Load() + ctx = ToContext(ctx, flags) + ctx = fillContextWithFeatureSpecificFlags(ctx, flags) + return ctx } // IsEnabled is a shortcut for Load().IsEnabled(featureName) @@ -98,3 +102,10 @@ func (s *Store) Load() Flags { } return loaded.(Flags) } + +func fillContextWithFeatureSpecificFlags(ctx context.Context, flags Flags) context.Context { + if flags.IsEnabled(KReferenceGroup) { + ctx = duckv1.KReferenceGroupAllowed(ctx) + } + return ctx +} diff --git a/pkg/apis/messaging/v1/subscription_validation.go b/pkg/apis/messaging/v1/subscription_validation.go index 5f70612c2f4..93bcdf8c0ea 100644 --- a/pkg/apis/messaging/v1/subscription_validation.go +++ b/pkg/apis/messaging/v1/subscription_validation.go @@ -21,7 +21,6 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "k8s.io/apimachinery/pkg/api/equality" - "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmp" @@ -58,10 +57,6 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError { } if !missingSubscriber { - ctx := ctx - if feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { - ctx = duckv1.KReferenceGroupAllowed(ctx) - } if fe := ss.Subscriber.Validate(ctx); fe != nil { errs = errs.Also(fe.ViaField("subscriber")) } From b86c6f4e1030d113a59dc812311dd6ef2962c79c Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 9 Jun 2021 12:17:32 +0200 Subject: [PATCH 15/23] Use new way to configure tests Signed-off-by: Francesco Guardiani --- test/experimental/config/features.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/test/experimental/config/features.yaml b/test/experimental/config/features.yaml index 36fc042ffd7..e5e32b6767e 100644 --- a/test/experimental/config/features.yaml +++ b/test/experimental/config/features.yaml @@ -22,3 +22,4 @@ metadata: knative.dev/config-propagation: original knative.dev/config-category: eventing data: + kreference-group: "enabled" \ No newline at end of file From 740187a0c2d8f2f3807c2b45af45859002a79036 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 14 Jun 2021 15:31:43 +0200 Subject: [PATCH 16/23] Link to details Signed-off-by: Francesco Guardiani --- config/core/configmaps/features.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index a0f8c8ff734..afa2324fc5b 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -23,4 +23,5 @@ metadata: knative.dev/config-category: eventing data: # ALPHA feature: The kreference-group allows you to use the Group field in KReferences. + # For more details: https://github.com/knative/eventing/issues/5086 kreference-group: "disabled" \ No newline at end of file From d984d6d2fc795d67d811c94e2260bde50a69a0f0 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 14 Jun 2021 15:37:55 +0200 Subject: [PATCH 17/23] Newline Signed-off-by: Francesco Guardiani --- config/core/configmaps/features.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index afa2324fc5b..6856d9b3351 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -24,4 +24,4 @@ metadata: data: # ALPHA feature: The kreference-group allows you to use the Group field in KReferences. # For more details: https://github.com/knative/eventing/issues/5086 - kreference-group: "disabled" \ No newline at end of file + kreference-group: "disabled" From 89b8da65d0cc73162a8e6207baa6758007908a95 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 15 Jun 2021 11:27:55 +0200 Subject: [PATCH 18/23] Remove hacky replace Signed-off-by: Francesco Guardiani --- go.mod | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.mod b/go.mod index cf13eaad9ed..af4930ef0df 100644 --- a/go.mod +++ b/go.mod @@ -48,5 +48,3 @@ require ( ) replace github.com/prometheus/client_golang => github.com/prometheus/client_golang v0.9.2 - -replace knative.dev/pkg => ../pkg From 18ddec0c4512db0c67714277bd6fda947fd8dee1 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 15 Jun 2021 11:34:18 +0200 Subject: [PATCH 19/23] Updated with latest changes Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/controller.go | 3 ++- pkg/reconciler/subscription/subscription.go | 5 ++--- pkg/reconciler/subscription/subscription_test.go | 4 +++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/reconciler/subscription/controller.go b/pkg/reconciler/subscription/controller.go index 7cbfb690bd8..6483e4c7eee 100644 --- a/pkg/reconciler/subscription/controller.go +++ b/pkg/reconciler/subscription/controller.go @@ -23,6 +23,7 @@ import ( "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/kref" "knative.dev/pkg/logging" "knative.dev/pkg/resolver" "knative.dev/pkg/tracker" @@ -51,7 +52,7 @@ func NewController( r := &Reconciler{ dynamicClientSet: dynamicclient.Get(ctx), - crdLister: customresourcedefinition.Get(ctx).Lister(), + kreferenceResolver: kref.NewKReferenceResolver(customresourcedefinition.Get(ctx).Lister()), subscriptionLister: subscriptionInformer.Lister(), channelLister: channelInformer.Lister(), } diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index e9062ca67b1..529047b4419 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -22,7 +22,6 @@ import ( "go.uber.org/zap" corev1 "k8s.io/api/core/v1" - apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -66,7 +65,7 @@ type Reconciler struct { dynamicClientSet dynamic.Interface // crdLister is used to resolve the ref version - crdLister apiextensionsv1lister.CustomResourceDefinitionLister + kreferenceResolver *kref.KReferenceResolver // listers index properties about resources subscriptionLister listers.SubscriptionLister @@ -211,7 +210,7 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub // Resolve the group if subscriber.Ref != nil && feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) { var err error - subscriber.Ref, err = kref.ResolveGroup(subscriber.Ref, r.crdLister) + subscriber.Ref, err = r.kreferenceResolver.ResolveGroup(subscriber.Ref) if err != nil { logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref", zap.Error(err), diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index e9143e63939..9526f4256e4 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -22,9 +22,11 @@ import ( "fmt" "testing" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/utils/pointer" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/injection/clients/dynamicclient" + "knative.dev/pkg/kref" "knative.dev/pkg/network" eventingclient "knative.dev/eventing/pkg/client/injection/client" @@ -1449,7 +1451,7 @@ func TestAllCases(t *testing.T) { channelLister: listers.GetMessagingChannelLister(), channelableTracker: duck.NewListableTracker(ctx, channelable.Get, func(types.NamespacedName) {}, 0), destinationResolver: resolver.NewURIResolver(ctx, func(types.NamespacedName) {}), - crdLister: listers.GetCustomResourceDefinitionLister(), + kreferenceResolver: kref.NewKReferenceResolver(listers.GetCustomResourceDefinitionLister()), tracker: &FakeTracker{}, } return subscription.NewReconciler(ctx, logger, From 539d8179ac630a5484fc289cb85aac568f5c9953 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 15 Jun 2021 14:43:42 +0200 Subject: [PATCH 20/23] Added knative_reference Signed-off-by: Francesco Guardiani --- .../knative.dev/pkg/kref/knative_reference.go | 77 +++++++++++++++++++ vendor/modules.txt | 1 + 2 files changed, 78 insertions(+) create mode 100644 vendor/knative.dev/pkg/kref/knative_reference.go diff --git a/vendor/knative.dev/pkg/kref/knative_reference.go b/vendor/knative.dev/pkg/kref/knative_reference.go new file mode 100644 index 00000000000..14f99c4e67f --- /dev/null +++ b/vendor/knative.dev/pkg/kref/knative_reference.go @@ -0,0 +1,77 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kref + +import ( + "fmt" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsv1lister "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +// KReferenceResolver is an object that resolves the KReference.Group field +// Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086 +type KReferenceResolver struct { + crdLister apiextensionsv1lister.CustomResourceDefinitionLister +} + +// NewKReferenceResolver creates a new KReferenceResolver from a crdLister +// Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086 +func NewKReferenceResolver(crdLister apiextensionsv1lister.CustomResourceDefinitionLister) *KReferenceResolver { + return &KReferenceResolver{crdLister: crdLister} +} + +// ResolveGroup resolves the APIVersion of a KReference starting from the Group. +// In order to execute this method, you need RBAC to read the CRD of the Resource referred in this KReference. +// Note: This API is EXPERIMENTAL and might break anytime. For more details: https://github.com/knative/eventing/issues/5086 +func (resolver *KReferenceResolver) ResolveGroup(kr *duckv1.KReference) (*duckv1.KReference, error) { + if kr.Group == "" { + // Nothing to do here + return kr, nil + } + + kr = kr.DeepCopy() + + actualGvk := schema.GroupVersionKind{Group: kr.Group, Kind: kr.Kind} + pluralGvk, _ := meta.UnsafeGuessKindToResource(actualGvk) + crd, err := resolver.crdLister.Get(pluralGvk.GroupResource().String()) + if err != nil { + return nil, err + } + + actualGvk.Version, err = findCRDStorageVersion(crd) + if err != nil { + return nil, err + } + + kr.APIVersion, kr.Kind = actualGvk.ToAPIVersionAndKind() + + return kr, nil +} + +// This function runs under the assumption that there must be exactly one "storage" version +func findCRDStorageVersion(crd *apiextensionsv1.CustomResourceDefinition) (string, error) { + for _, version := range crd.Spec.Versions { + if version.Storage { + return version.Name, nil + } + } + return "", fmt.Errorf("this CRD %s doesn't have a storage version! Kubernetes, you're drunk, go home", crd.Name) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index c6c7e511408..e613d8a491d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1074,6 +1074,7 @@ knative.dev/pkg/injection/sharedmain knative.dev/pkg/kflag knative.dev/pkg/kmeta knative.dev/pkg/kmp +knative.dev/pkg/kref knative.dev/pkg/leaderelection knative.dev/pkg/leaderelection/chaosduck knative.dev/pkg/logging From 8fd4096c0341936e4f77f47bc93d115867f7b8b7 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 15 Jun 2021 15:02:24 +0200 Subject: [PATCH 21/23] Fixed UT Signed-off-by: Francesco Guardiani --- pkg/reconciler/subscription/controller_test.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/subscription/controller_test.go b/pkg/reconciler/subscription/controller_test.go index 1addb7fcf26..befaed5bf14 100644 --- a/pkg/reconciler/subscription/controller_test.go +++ b/pkg/reconciler/subscription/controller_test.go @@ -19,9 +19,13 @@ package subscription import ( "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing/pkg/apis/feature" + // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" @@ -33,7 +37,13 @@ import ( func TestNew(t *testing.T) { ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: feature.FlagsConfigName, + }, + }, + )) if c == nil { t.Fatal("Expected NewController to return a non-nil value") From dffd6182fab6092cc49b725c6c9772d273948018 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 15 Jun 2021 15:05:29 +0200 Subject: [PATCH 22/23] Missing newline Signed-off-by: Francesco Guardiani --- test/experimental/config/features.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/experimental/config/features.yaml b/test/experimental/config/features.yaml index e5e32b6767e..1138d90b544 100644 --- a/test/experimental/config/features.yaml +++ b/test/experimental/config/features.yaml @@ -22,4 +22,4 @@ metadata: knative.dev/config-propagation: original knative.dev/config-category: eventing data: - kreference-group: "enabled" \ No newline at end of file + kreference-group: "enabled" From 46afa33c663a95f549ca3cbe5e7b398a69e98959 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 15 Jun 2021 15:22:18 +0200 Subject: [PATCH 23/23] Fixed UT Signed-off-by: Francesco Guardiani --- pkg/apis/feature/store.go | 7 ++----- pkg/reconciler/subscription/subscription_test.go | 2 +- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/pkg/apis/feature/store.go b/pkg/apis/feature/store.go index 22a65e7d981..a22f313c232 100644 --- a/pkg/apis/feature/store.go +++ b/pkg/apis/feature/store.go @@ -51,7 +51,7 @@ func FromContextOrDefaults(ctx context.Context) Flags { // ToContext attaches the provided Flags to the provided context, returning the // new context with the Flags attached. func ToContext(ctx context.Context, c Flags) context.Context { - return context.WithValue(ctx, cfgKey{}, c) + return fillContextWithFeatureSpecificFlags(context.WithValue(ctx, cfgKey{}, c), c) } // Store is a typed wrapper around configmap.Untyped store to handle our configmaps. @@ -78,10 +78,7 @@ func NewStore(logger configmap.Logger, onAfterStore ...func(name string, value i // ToContext attaches the current Config state to the provided context. func (s *Store) ToContext(ctx context.Context) context.Context { - flags := s.Load() - ctx = ToContext(ctx, flags) - ctx = fillContextWithFeatureSpecificFlags(ctx, flags) - return ctx + return ToContext(ctx, s.Load()) } // IsEnabled is a shortcut for Load().IsEnabled(featureName) diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 9526f4256e4..7f7c0ce3b90 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -212,7 +212,7 @@ func TestAllCases(t *testing.T) { }, { Name: "subscription goes ready without api version", Ctx: feature.ToContext(context.TODO(), feature.Flags{ - feature.KReferenceGroup: feature.Allowed, + feature.KReferenceGroup: feature.Enabled, }), Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS,