Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/core/resources/subscription.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ spec:
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
x-kubernetes-preserve-unknown-fields: true # This is necessary to enable the experimental feature
delivery:
description: Delivery configuration
type: object
Expand Down
12 changes: 12 additions & 0 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,18 @@ func (r *Reconciler) getSubStatus(subscription *v1.Subscription, channel *eventi
}

func (r *Reconciler) trackAndFetchChannel(ctx context.Context, sub *v1.Subscription, ref duckv1.KReference) (runtime.Object, pkgreconciler.Event) {
// Resolve the group
if feature.FromContext(ctx).IsEnabled(feature.KReferenceGroup) {
newRef, err := r.kreferenceResolver.ResolveGroup(&ref)
if err != nil {
logging.FromContext(ctx).Warnw("Failed to resolve Channel reference",
zap.Error(err),
zap.Any("ref", ref))
return nil, err
}
ref = *newRef
}

// Track the channel using the channelableTracker.
// We don't need the explicitly set a channelInformer, as this will dynamically generate one for us.
// This code needs to be called before checking the existence of the `channel`, in order to make sure the
Expand Down
253 changes: 253 additions & 0 deletions pkg/reconciler/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,85 @@ func TestAllCases(t *testing.T) {
Ready: "True",
}}),
),
// Subscriber CRD
eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev",
eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1beta1",
Storage: false,
}, {
Name: "v1",
Storage: true,
}}),
),
},
Key: testNS + "/" + subscriptionName,
WantErr: false,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannel(imcV1GVK, channelName),
WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
// - Status Update -
MarkSubscriptionReady,
),
}},
}, {
Name: "subscription goes ready with both api version and group",
Ctx: feature.ToContext(context.TODO(), feature.Flags{
feature.KReferenceGroup: feature.Enabled,
}),
Objects: []runtime.Object{
NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannel(imcV1GVK, channelName),
WithSubscriptionSubscriberRefUsingApiVersionAndGroup(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
MarkReferencesResolved,
MarkAddedToChannel,
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
),
// Subscriber
NewUnstructured(subscriberGVK, subscriberName, testNS,
WithUnstructuredAddressable(subscriberDNS),
),
// Reply
NewInMemoryChannel(replyName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelAddress(replyDNS),
),
// Channel
NewInMemoryChannel(channelName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelReady(channelDNS),
WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{
UID: subscriptionUID,
Generation: 0,
SubscriberURI: subscriberURI,
ReplyURI: replyURI,
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}}),
WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{
UID: subscriptionUID,
ObservedGeneration: 0,
Ready: "True",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: "True",
}}),
),
// IMC CRD
eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev",
eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{
Expand All @@ -278,10 +357,184 @@ func TestAllCases(t *testing.T) {
Object: NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannel(imcV1GVK, channelName),
WithSubscriptionSubscriberRefUsingApiVersionAndGroup(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
// - Status Update -
MarkSubscriptionReady,
),
}},
}, {
Name: "subscription goes ready with Channel.Group and Subscriber.Ref.Group",
Ctx: feature.ToContext(context.TODO(), feature.Flags{
feature.KReferenceGroup: feature.Enabled,
}),
Objects: []runtime.Object{
NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannelUsingGroup(imcV1GVK, channelName),
WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
MarkReferencesResolved,
MarkAddedToChannel,
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
),
// Subscriber
NewUnstructured(subscriberGVK, subscriberName, testNS,
WithUnstructuredAddressable(subscriberDNS),
),
// Reply
NewInMemoryChannel(replyName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelAddress(replyDNS),
),
// Channel
NewInMemoryChannel(channelName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelReady(channelDNS),
WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{
UID: subscriptionUID,
Generation: 0,
SubscriberURI: subscriberURI,
ReplyURI: replyURI,
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}}),
WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{
UID: subscriptionUID,
ObservedGeneration: 0,
Ready: "True",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: "True",
}}),
),
eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev",
eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1beta1",
Storage: false,
}, {
Name: "v1",
Storage: true,
}}),
),
eventingtesting.NewCustomResourceDefinition("inmemorychannels.messaging.knative.dev",
eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1beta1",
Storage: false,
}, {
Name: "v1",
Storage: true,
}}),
),
},
Key: testNS + "/" + subscriptionName,
WantErr: false,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannelUsingGroup(imcV1GVK, channelName),
WithSubscriptionSubscriberRefUsingGroup(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
// - Status Update -
MarkSubscriptionReady,
),
}},
}, {
Name: "subscription goes ready with Channel.Group and Channel.APIVersion",
Ctx: feature.ToContext(context.TODO(), feature.Flags{
feature.KReferenceGroup: feature.Enabled,
}),
Objects: []runtime.Object{
NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannelUsingApiVersionAndGroup(imcV1GVK, channelName),
WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
MarkReferencesResolved,
MarkAddedToChannel,
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
),
// Subscriber
NewUnstructured(subscriberGVK, subscriberName, testNS,
WithUnstructuredAddressable(subscriberDNS),
),
// Reply
NewInMemoryChannel(replyName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelAddress(replyDNS),
),
// Channel
NewInMemoryChannel(channelName, testNS,
WithInitInMemoryChannelConditions,
WithInMemoryChannelReady(channelDNS),
WithInMemoryChannelSubscribers([]eventingduck.SubscriberSpec{{
UID: subscriptionUID,
Generation: 0,
SubscriberURI: subscriberURI,
ReplyURI: replyURI,
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}}),
WithInMemoryChannelStatusSubscribers([]eventingduck.SubscriberStatus{{
UID: subscriptionUID,
ObservedGeneration: 0,
Ready: "True",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: "True",
}}),
),
eventingtesting.NewCustomResourceDefinition("subscribers.messaging.knative.dev",
eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1beta1",
Storage: false,
}, {
Name: "v1",
Storage: true,
}}),
),
eventingtesting.NewCustomResourceDefinition("inmemorychannels.messaging.knative.dev",
eventingtesting.WithCustomResourceDefinitionVersions([]apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1beta1",
Storage: false,
}, {
Name: "v1",
Storage: true,
}}),
),
},
Key: testNS + "/" + subscriptionName,
WantErr: false,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: NewSubscription(subscriptionName, testNS,
WithSubscriptionUID(subscriptionUID),
WithSubscriptionChannelUsingApiVersionAndGroup(imcV1GVK, channelName),
WithSubscriptionSubscriberRef(subscriberGVK, subscriberName, testNS),
WithSubscriptionReply(imcV1GVK, replyName, testNS),
WithInitSubscriptionConditions,
WithSubscriptionFinalizers(finalizerName),
WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI),
WithSubscriptionPhysicalSubscriptionReply(replyURI),
// - Status Update -
Expand Down
35 changes: 35 additions & 0 deletions pkg/reconciler/testing/v1/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,27 @@ func WithSubscriptionChannel(gvk metav1.GroupVersionKind, name string) Subscript
}
}

func WithSubscriptionChannelUsingGroup(gvk metav1.GroupVersionKind, name string) SubscriptionOption {
return func(s *v1.Subscription) {
s.Spec.Channel = duckv1.KReference{
Group: gvk.Group,
Kind: gvk.Kind,
Name: name,
}
}
}

func WithSubscriptionChannelUsingApiVersionAndGroup(gvk metav1.GroupVersionKind, name string) SubscriptionOption {
return func(s *v1.Subscription) {
s.Spec.Channel = duckv1.KReference{
APIVersion: apiVersion(gvk),
Group: gvk.Group,
Kind: gvk.Kind,
Name: name,
}
}
}

func WithSubscriptionSubscriberRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption {
return func(s *v1.Subscription) {
s.Spec.Subscriber = &duckv1.Destination{
Expand All @@ -151,6 +172,20 @@ func WithSubscriptionSubscriberRefUsingGroup(gvk metav1.GroupVersionKind, name,
}
}

func WithSubscriptionSubscriberRefUsingApiVersionAndGroup(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption {
return func(s *v1.Subscription) {
s.Spec.Subscriber = &duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: apiVersion(gvk),
Group: gvk.Group,
Kind: gvk.Kind,
Name: name,
Namespace: namespace,
},
}
}
}

func WithSubscriptionDeliveryRef(gvk metav1.GroupVersionKind, name, namespace string) SubscriptionOption {
return func(s *v1.Subscription) {
s.Spec.Delivery = &eventingduckv1.DeliverySpec{
Expand Down
Loading