diff --git a/pkg/apis/messaging/v1alpha1/subscription_types.go b/pkg/apis/messaging/v1alpha1/subscription_types.go index 77c141852a1..f8bcfc9d1e3 100644 --- a/pkg/apis/messaging/v1alpha1/subscription_types.go +++ b/pkg/apis/messaging/v1alpha1/subscription_types.go @@ -120,11 +120,17 @@ type SubscriptionSpec struct { // ReplyStrategy specifies the handling of the Subscriber's returned replies. // If no Subscriber is specified, the identity function is assumed. type ReplyStrategy struct { + // The resource pointed by this Destination must meet the Addressable contract + // with a reference to the Addressable duck type. If the resource does not meet this contract, + // it will be reflected in the Subscription's status. + // +optional + *duckv1beta1.Destination `json:",inline"` + // The resource pointed by this ObjectReference must meet the Addressable contract // with a reference to the Addressable duck type. If the resource does not meet this contract, // it will be reflected in the Subscription's status. // +optional - Channel *duckv1beta1.Destination `json:"channel,omitempty"` + DeprecatedChannel *duckv1beta1.Destination `json:"channel,omitempty"` } // SubscriptionStatus (computed) for a subscription diff --git a/pkg/apis/messaging/v1alpha1/subscription_validation.go b/pkg/apis/messaging/v1alpha1/subscription_validation.go index 58f9f859a09..9fbb19c7e02 100644 --- a/pkg/apis/messaging/v1alpha1/subscription_validation.go +++ b/pkg/apis/messaging/v1alpha1/subscription_validation.go @@ -58,8 +58,16 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError { } if !missingReply { - if fe := ss.Reply.Channel.Validate(ctx); fe != nil { - errs = errs.Also(fe.ViaField("reply.channel")) + if !isDestinationNilOrEmpty(ss.Reply.DeprecatedChannel) && !isDestinationNilOrEmpty(ss.Reply.Destination) { + errs = errs.Also(apis.ErrGeneric("channel and [ref, uri] can't be both present", "reply.channel", "reply.ref", "reply.uri")) + } else if !isDestinationNilOrEmpty(ss.Reply.DeprecatedChannel) { + if fe := ss.Reply.DeprecatedChannel.Validate(ctx); fe != nil { + errs = errs.Also(fe.ViaField("reply.channel")) + } + } else { + if fe := ss.Reply.Destination.Validate(ctx); fe != nil { + errs = errs.Also(fe.ViaField("reply")) + } } } @@ -71,7 +79,8 @@ func isDestinationNilOrEmpty(d *duckv1beta1.Destination) bool { } func isReplyStrategyNilOrEmpty(r *ReplyStrategy) bool { - return r == nil || equality.Semantic.DeepEqual(r, &ReplyStrategy{}) || equality.Semantic.DeepEqual(r.Channel, &duckv1beta1.Destination{}) + return r == nil || equality.Semantic.DeepEqual(r, &ReplyStrategy{}) || + (equality.Semantic.DeepEqual(r.DeprecatedChannel, &duckv1beta1.Destination{}) && (equality.Semantic.DeepEqual(r.Destination, &duckv1beta1.Destination{}))) } func (s *Subscription) CheckImmutableFields(ctx context.Context, og apis.Immutable) *apis.FieldError { diff --git a/pkg/apis/messaging/v1alpha1/subscription_validation_test.go b/pkg/apis/messaging/v1alpha1/subscription_validation_test.go index 32cca9e2fd5..27c6816dff7 100644 --- a/pkg/apis/messaging/v1alpha1/subscription_validation_test.go +++ b/pkg/apis/messaging/v1alpha1/subscription_validation_test.go @@ -45,7 +45,7 @@ func getValidChannelRef() corev1.ObjectReference { func getValidReplyStrategy() *ReplyStrategy { return &ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + Destination: &duckv1beta1.Destination{ DeprecatedName: replyChannelName, DeprecatedKind: channelKind, DeprecatedAPIVersion: channelAPIVersion, @@ -223,11 +223,11 @@ func TestSubscriptionSpecValidation(t *testing.T) { return fe }(), }, { - name: "missing name in Reply.Ref", + name: "missing name in Reply.DeprecatedChannel.Ref", c: &SubscriptionSpec{ Channel: getValidChannelRef(), Reply: &ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + DeprecatedChannel: &duckv1beta1.Destination{ DeprecatedKind: channelKind, DeprecatedAPIVersion: channelAPIVersion, }, @@ -237,6 +237,21 @@ func TestSubscriptionSpecValidation(t *testing.T) { fe := apis.ErrMissingField("reply.channel.name") return fe }(), + }, { + name: "missing name in Reply.Ref", + c: &SubscriptionSpec{ + Channel: getValidChannelRef(), + Reply: &ReplyStrategy{ + Destination: &duckv1beta1.Destination{ + DeprecatedKind: channelKind, + DeprecatedAPIVersion: channelAPIVersion, + }, + }, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply.name") + return fe + }(), }} for _, test := range tests { @@ -257,7 +272,7 @@ func TestSubscriptionImmutable(t *testing.T) { newSubscriber.Ref.Name = "newSubscriber" newReply := getValidReplyStrategy() - newReply.Channel.DeprecatedName = "newReplyChannel" + newReply.Destination.DeprecatedName = "newReplyChannel" tests := []struct { name string diff --git a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index cda331014a0..9bf09042c1d 100644 --- a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -445,8 +445,13 @@ func (in *ParallelSubscriptionStatus) DeepCopy() *ParallelSubscriptionStatus { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReplyStrategy) DeepCopyInto(out *ReplyStrategy) { *out = *in - if in.Channel != nil { - in, out := &in.Channel, &out.Channel + if in.Destination != nil { + in, out := &in.Destination, &out.Destination + *out = new(v1beta1.Destination) + (*in).DeepCopyInto(*out) + } + if in.DeprecatedChannel != nil { + in, out := &in.DeprecatedChannel, &out.DeprecatedChannel *out = new(v1beta1.Destination) (*in).DeepCopyInto(*out) } diff --git a/pkg/reconciler/parallel/resources/subscription.go b/pkg/reconciler/parallel/resources/subscription.go index ebd99bc5664..5db6d955cc3 100644 --- a/pkg/reconciler/parallel/resources/subscription.go +++ b/pkg/reconciler/parallel/resources/subscription.go @@ -59,7 +59,7 @@ func NewFilterSubscription(branchNumber int, p *v1alpha1.Parallel) *v1alpha1.Sub }, } r.Spec.Reply = &v1alpha1.ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + Destination: &duckv1beta1.Destination{ Ref: &corev1.ObjectReference{ APIVersion: p.Spec.ChannelTemplate.APIVersion, Kind: p.Spec.ChannelTemplate.Kind, @@ -94,9 +94,9 @@ func NewSubscription(branchNumber int, p *v1alpha1.Parallel) *v1alpha1.Subscript } if p.Spec.Branches[branchNumber].Reply != nil { - r.Spec.Reply = &v1alpha1.ReplyStrategy{Channel: p.Spec.Branches[branchNumber].Reply} + r.Spec.Reply = &v1alpha1.ReplyStrategy{Destination: p.Spec.Branches[branchNumber].Reply} } else if p.Spec.Reply != nil { - r.Spec.Reply = &v1alpha1.ReplyStrategy{Channel: p.Spec.Reply} + r.Spec.Reply = &v1alpha1.ReplyStrategy{Destination: p.Spec.Reply} } return r } diff --git a/pkg/reconciler/sequence/resources/subscription.go b/pkg/reconciler/sequence/resources/subscription.go index 7a3f3a07494..a053a32ed94 100644 --- a/pkg/reconciler/sequence/resources/subscription.go +++ b/pkg/reconciler/sequence/resources/subscription.go @@ -58,7 +58,7 @@ func NewSubscription(stepNumber int, p *v1alpha1.Sequence) *v1alpha1.Subscriptio // last one, we'll use the (optional) reply from the Sequence Spec. if stepNumber < len(p.Spec.Steps)-1 { r.Spec.Reply = &v1alpha1.ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + Destination: &duckv1beta1.Destination{ Ref: &corev1.ObjectReference{ APIVersion: p.Spec.ChannelTemplate.APIVersion, Kind: p.Spec.ChannelTemplate.Kind, @@ -67,7 +67,7 @@ func NewSubscription(stepNumber int, p *v1alpha1.Sequence) *v1alpha1.Subscriptio }, } } else if p.Spec.Reply != nil { - r.Spec.Reply = &v1alpha1.ReplyStrategy{Channel: p.Spec.Reply} + r.Spec.Reply = &v1alpha1.ReplyStrategy{Destination: p.Spec.Reply} } return r } diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 61af6e7f7d1..cb8950efb5e 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -220,17 +220,30 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc subscription.Status.PhysicalSubscription.ReplyURI = nil subscription.Status.ClearDeprecated() if !isNilOrEmptyReply(reply) { + hasDeprecatedReplyStatus := false + var destination *duckv1beta1.Destination + if reply.DeprecatedChannel != nil && !equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{}) { + destination = reply.DeprecatedChannel + // Add a condition warning that the fields are deprecated. + subscription.Status.MarkReplyDeprecatedRef(replyFieldsDeprecated, "Using deprecated channel field when specifying spec.reply. Update to spec.reply.ref or spec.reply.uri. These will be removed in future release") + hasDeprecatedReplyStatus = true + } else { + destination = reply.Destination + } + // Populate the namespace for the subscriber since it is in the namespace - if reply.Channel.Ref != nil { - reply.Channel.Ref.Namespace = subscription.Namespace - } else if reply.Channel.DeprecatedName != "" { + if destination.Ref != nil { + destination.Ref.Namespace = subscription.Namespace + } else if destination.DeprecatedName != "" { // Add the check for DeprecatedName, since without that it wouldn't // have passed validation. - reply.Channel.DeprecatedNamespace = subscription.Namespace - // Add a condition warning that the fields are deprecated. - subscription.Status.MarkReplyDeprecatedRef(replyFieldsDeprecated, "Using deprecated object ref fields when specifying spec.reply. Update to spec.reply.ref. These will be removed in 0.11") + destination.DeprecatedNamespace = subscription.Namespace + if !hasDeprecatedReplyStatus { + // Add a condition warning that the fields are deprecated. + subscription.Status.MarkReplyDeprecatedRef(replyFieldsDeprecated, "Using deprecated object ref fields when specifying spec.reply. Update to spec.reply.ref. These will be removed in 0.11") + } } - replyURIStr, err := r.destinationResolver.URIFromDestination(*reply.Channel, subscription) + replyURIStr, err := r.destinationResolver.URIFromDestination(*destination, subscription) if err != nil { logging.FromContext(ctx).Warn("Failed to resolve reply", zap.Error(err), @@ -241,11 +254,11 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc } replyURI, err := apis.ParseURL(replyURIStr) if err != nil { - logging.FromContext(ctx).Warn("Failed to parse URL for spec.Reply.Channel URL", + logging.FromContext(ctx).Warn("Failed to parse URL for spec.reply URL", zap.Error(err), - zap.Any("reply.channel", reply.Channel)) - r.Recorder.Eventf(subscription, corev1.EventTypeWarning, replyResolveFailed, "Failed to parse URL for spec.reply.channel: %v", err) - subscription.Status.MarkReferencesNotResolved(replyResolveFailed, "Failed to parse URL for spec.reply.channel: %v", err) + zap.Any("reply", destination)) + r.Recorder.Eventf(subscription, corev1.EventTypeWarning, replyResolveFailed, "Failed to parse URL for spec.reply: %v", err) + subscription.Status.MarkReferencesNotResolved(replyResolveFailed, "Failed to parse URL for spec.reply: %v", err) return err } @@ -391,8 +404,9 @@ func (r *Reconciler) validateChannel(ctx context.Context, channel *eventingduckv return nil } -func isNilOrEmptyReply(reply *v1alpha1.ReplyStrategy) bool { - return reply == nil || equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{}) +func isNilOrEmptyReply(r *v1alpha1.ReplyStrategy) bool { + return r == nil || equality.Semantic.DeepEqual(r, &v1alpha1.ReplyStrategy{}) || + (equality.Semantic.DeepEqual(r.DeprecatedChannel, &duckv1beta1.Destination{}) && (equality.Semantic.DeepEqual(r.Destination, &duckv1beta1.Destination{}))) } func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1alpha1.DeliverySpec) bool { return delivery == nil || equality.Semantic.DeepEqual(delivery, &eventingduckv1alpha1.DeliverySpec{}) || diff --git a/pkg/reconciler/testing/subscription.go b/pkg/reconciler/testing/subscription.go index ba46a1da7a9..3d3c1bfb00c 100644 --- a/pkg/reconciler/testing/subscription.go +++ b/pkg/reconciler/testing/subscription.go @@ -172,7 +172,7 @@ func WithSubscriptionReferencesNotResolved(reason, msg string) SubscriptionOptio func WithSubscriptionReply(gvk metav1.GroupVersionKind, name string) SubscriptionOption { return func(s *v1alpha1.Subscription) { s.Spec.Reply = &v1alpha1.ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + Destination: &duckv1beta1.Destination{ DeprecatedAPIVersion: apiVersion(gvk), DeprecatedKind: gvk.Kind, DeprecatedName: name, @@ -184,7 +184,7 @@ func WithSubscriptionReply(gvk metav1.GroupVersionKind, name string) Subscriptio func WithSubscriptionReplyNotDeprecated(gvk metav1.GroupVersionKind, name string) SubscriptionOption { return func(s *v1alpha1.Subscription) { s.Spec.Reply = &v1alpha1.ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + Destination: &duckv1beta1.Destination{ Ref: &corev1.ObjectReference{ APIVersion: apiVersion(gvk), Kind: gvk.Kind, diff --git a/pkg/reconciler/trigger/resources/subscription.go b/pkg/reconciler/trigger/resources/subscription.go index 3a7a1654edf..c80e1095aa8 100644 --- a/pkg/reconciler/trigger/resources/subscription.go +++ b/pkg/reconciler/trigger/resources/subscription.go @@ -59,7 +59,7 @@ func NewSubscription(t *eventingv1alpha1.Trigger, brokerTrigger, brokerIngress * URI: tmpURI, }, Reply: &messagingv1alpha1.ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + Destination: &duckv1beta1.Destination{ Ref: &corev1.ObjectReference{ APIVersion: brokerIngress.APIVersion, Kind: brokerIngress.Kind, diff --git a/test/base/resources/eventing.go b/test/base/resources/eventing.go index c5bd08399f4..16668609fd9 100644 --- a/test/base/resources/eventing.go +++ b/test/base/resources/eventing.go @@ -63,7 +63,7 @@ func WithReplyForSubscription(name string, typemeta *metav1.TypeMeta) Subscripti return func(s *messagingv1alpha1.Subscription) { if name != "" { s.Spec.Reply = &messagingv1alpha1.ReplyStrategy{ - Channel: &duckv1beta1.Destination{ + Destination: &duckv1beta1.Destination{ Ref: pkgTest.CoreV1ObjectReference(typemeta.Kind, typemeta.APIVersion, name), }, }