Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/apis/feature"

"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -91,9 +92,12 @@ func NewDefaultingAdmissionController(ctx context.Context, cmw configmap.Watcher
channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store"))
channelStore.WatchConfigs(cmw)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return channelStore.ToContext(store.ToContext(ctx))
return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx)))
Comment thread
slinkydeveloper marked this conversation as resolved.
}

return defaulting.NewAdmissionController(ctx,
Expand Down Expand Up @@ -125,9 +129,13 @@ func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher

pingstore := pingdefaultconfig.NewStore(logging.FromContext(ctx).Named("ping-config-store"))
pingstore.WatchConfigs(cmw)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx)))
return featureStore.ToContext(channelStore.ToContext(pingstore.ToContext(store.ToContext(ctx))))
}

return validation.NewAdmissionController(ctx,
Expand Down Expand Up @@ -201,9 +209,12 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro
channelStore := channeldefaultconfig.NewStore(logging.FromContext(ctx).Named("channel-config-store"))
channelStore.WatchConfigs(cmw)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return channelStore.ToContext(store.ToContext(ctx))
return featureStore.ToContext(channelStore.ToContext(store.ToContext(ctx)))
}

var (
Expand Down
3 changes: 3 additions & 0 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ metadata:
knative.dev/config-propagation: original
knative.dev/config-category: eventing
data:
# ALPHA feature: The kreference-group allows you to use the Group field in KReferences.
# For more details: https://github.com/knative/eventing/issues/5086
kreference-group: "disabled"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably fine for now but eventually it should look more like https://github.com/knative/serving/blob/main/config/core/configmaps/features.yaml.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhh yeah seems like we don't have the example, but do we really need it? Isn't simpler for the user to have the flags already configured? Also, considering at some point some of them will become enabled by default?

1 change: 1 addition & 0 deletions config/core/resources/subscription.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ spec:
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.'
type: string
x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature
Comment thread
slinkydeveloper marked this conversation as resolved.
Comment thread
slinkydeveloper marked this conversation as resolved.
uri:
description: URI can be an absolute URL(non-empty scheme and non-empty host) pointing to the target or a relative URI. Relative URIs will be resolved using the base URI retrieved from Ref.
type: string
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ limitations under the License.
*/

package feature

const (
KReferenceGroup = "kreference-group"
Comment thread
slinkydeveloper marked this conversation as resolved.
)
10 changes: 9 additions & 1 deletion pkg/apis/feature/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package feature
import (
"context"

duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/configmap"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ func FromContextOrDefaults(ctx context.Context) Flags {
// ToContext attaches the provided Flags to the provided context, returning the
// new context with the Flags attached.
func ToContext(ctx context.Context, c Flags) context.Context {
return context.WithValue(ctx, cfgKey{}, c)
return fillContextWithFeatureSpecificFlags(context.WithValue(ctx, cfgKey{}, c), c)
}

// Store is a typed wrapper around configmap.Untyped store to handle our configmaps.
Expand Down Expand Up @@ -98,3 +99,10 @@ func (s *Store) Load() Flags {
}
return loaded.(Flags)
}

func fillContextWithFeatureSpecificFlags(ctx context.Context, flags Flags) context.Context {
if flags.IsEnabled(KReferenceGroup) {
ctx = duckv1.KReferenceGroupAllowed(ctx)
}
return ctx
}
160 changes: 160 additions & 0 deletions pkg/apis/messaging/v1/subscription_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
Expand Down Expand Up @@ -243,6 +244,165 @@ func TestSubscriptionSpecValidation(t *testing.T) {
}
}

func TestSubscriptionSpecValidationWithKRefGroupFeatureEnabled(t *testing.T) {
tests := []struct {
name string
c *SubscriptionSpec
want *apis.FieldError
}{{
name: "valid",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
},
want: nil,
}, {
name: "valid with reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
Reply: getValidReply(),
},
want: nil,
}, {
name: "empty Channel",
c: &SubscriptionSpec{
Channel: duckv1.KReference{},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channel")
fe.Details = "the Subscription must reference a channel"
return fe
}(),
}, {
name: "missing name in Channel",
c: &SubscriptionSpec{
Channel: duckv1.KReference{
Kind: channelKind,
APIVersion: channelAPIVersion,
},
Subscriber: getValidDestination(),
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channel.name")
return fe
}(),
}, {
name: "missing Subscriber and Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("reply", "subscriber")
fe.Details = "the Subscription must reference at least one of (reply or a subscriber)"
return fe
}(),
}, {
name: "empty Subscriber and Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: &duckv1.Destination{},
Reply: &duckv1.Destination{},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("reply", "subscriber")
fe.Details = "the Subscription must reference at least one of (reply or a subscriber)"
return fe
}(),
}, {
name: "missing Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
},
want: nil,
}, {
name: "empty Reply",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
Reply: &duckv1.Destination{},
},
want: nil,
}, {
name: "missing Subscriber",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Reply: getValidReply(),
},
want: nil,
}, {
name: "empty Subscriber",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: &duckv1.Destination{},
Reply: getValidReply(),
},
want: nil,
}, {
name: "missing name in channel, and missing subscriber, reply",
c: &SubscriptionSpec{
Channel: duckv1.KReference{
Kind: channelKind,
APIVersion: channelAPIVersion,
},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("reply", "subscriber")
fe.Details = "the Subscription must reference at least one of (reply or a subscriber)"
return apis.ErrMissingField("channel.name").Also(fe)
}(),
}, {
name: "empty",
c: &SubscriptionSpec{},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channel")
fe.Details = "the Subscription must reference a channel"
return fe
}(),
}, {
name: "missing name in Subscriber.Ref",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: &duckv1.Destination{
Ref: &duckv1.KReference{
Namespace: namespace,
Kind: channelKind,
APIVersion: channelAPIVersion,
},
},
},
want: apis.ErrMissingField("subscriber.ref.name"),
}, {
name: "missing name in Subscriber.Ref",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Subscriber: getValidDestination(),
Reply: &duckv1.Destination{
Ref: &duckv1.KReference{
Namespace: namespace,
Name: "",
Kind: channelKind,
APIVersion: channelAPIVersion,
},
},
},
want: apis.ErrMissingField("reply.ref.name"),
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := feature.ToContext(context.TODO(), feature.Flags{
feature.KReferenceGroup: feature.Allowed,
})
got := test.c.Validate(ctx)
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Errorf("%s: validateChannel (-want, +got) = %v", test.name, diff)
}
})
}
}

func TestSubscriptionImmutable(t *testing.T) {
newChannel := getValidChannelRef()
newChannel.Name = "newChannel"
Expand Down
13 changes: 12 additions & 1 deletion pkg/reconciler/subscription/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ package subscription
import (
"context"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/kref"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"
Expand All @@ -44,12 +47,20 @@ func NewController(
subscriptionInformer := subscription.Get(ctx)
channelInformer := channel.Get(ctx)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
Comment thread
slinkydeveloper marked this conversation as resolved.
featureStore.WatchConfigs(cmw)

r := &Reconciler{
dynamicClientSet: dynamicclient.Get(ctx),
kreferenceResolver: kref.NewKReferenceResolver(customresourcedefinition.Get(ctx).Lister()),
subscriptionLister: subscriptionInformer.Lister(),
channelLister: channelInformer.Lister(),
}
impl := subscriptionreconciler.NewImpl(ctx, r)
impl := subscriptionreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

logging.FromContext(ctx).Info("Setting up event handlers")
subscriptionInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand Down
13 changes: 12 additions & 1 deletion pkg/reconciler/subscription/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@ package subscription
import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/apis/feature"

// Fake injection informers
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition/fake"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
},
},
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
20 changes: 20 additions & 0 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ import (

"knative.dev/pkg/apis/duck"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kref"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
listers "knative.dev/eventing/pkg/client/listers/messaging/v1"
Expand All @@ -62,6 +64,9 @@ type Reconciler struct {
// DynamicClientSet allows us to configure pluggable Build objects
dynamicClientSet dynamic.Interface

// crdLister is used to resolve the ref version
kreferenceResolver *kref.KReferenceResolver

// listers index properties about resources
subscriptionLister listers.SubscriptionLister
channelLister listers.ChannelLister
Expand Down Expand Up @@ -201,6 +206,21 @@ func (r *Reconciler) resolveSubscriber(ctx context.Context, subscription *v1.Sub
if subscriber.Ref != nil {
subscriber.Ref.Namespace = subscription.Namespace
}

// Resolve the group
if subscriber.Ref != nil && feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) {
var err error
subscriber.Ref, err = r.kreferenceResolver.ResolveGroup(subscriber.Ref)
if err != nil {
logging.FromContext(ctx).Warnw("Failed to resolve Subscriber.Ref",
zap.Error(err),
zap.Any("subscriber", subscriber))
subscription.Status.MarkReferencesNotResolved(subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %v", err)
return pkgreconciler.NewEvent(corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber.ref: %w", err)
}
logging.FromContext(ctx).Debugw("Group resolved", zap.Any("spec.subscriber.ref", subscriber.Ref))
Comment thread
slinkydeveloper marked this conversation as resolved.
}

subscriberURI, err := r.destinationResolver.URIFromDestinationV1(ctx, *subscriber, subscription)
if err != nil {
logging.FromContext(ctx).Warnw("Failed to resolve Subscriber",
Expand Down
Loading