Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/apis/messaging/v1alpha1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,17 @@ type SubscriptionSpec struct {
// ReplyStrategy specifies the handling of the Subscriber's returned replies.
// If no Subscriber is specified, the identity function is assumed.
type ReplyStrategy struct {
// The resource pointed by this Destination must meet the Addressable contract
// with a reference to the Addressable duck type. If the resource does not meet this contract,
// it will be reflected in the Subscription's status.
// +optional
*duckv1beta1.Destination `json:",inline"`

// The resource pointed by this ObjectReference must meet the Addressable contract
// with a reference to the Addressable duck type. If the resource does not meet this contract,
// it will be reflected in the Subscription's status.
// +optional
Channel *duckv1beta1.Destination `json:"channel,omitempty"`
DeprecatedChannel *duckv1beta1.Destination `json:"channel,omitempty"`
}

// SubscriptionStatus (computed) for a subscription
Expand Down
15 changes: 12 additions & 3 deletions pkg/apis/messaging/v1alpha1/subscription_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,16 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError {
}

if !missingReply {
if fe := ss.Reply.Channel.Validate(ctx); fe != nil {
errs = errs.Also(fe.ViaField("reply.channel"))
if !isDestinationNilOrEmpty(ss.Reply.DeprecatedChannel) && !isDestinationNilOrEmpty(ss.Reply.Destination) {
errs = errs.Also(apis.ErrGeneric("channel and [ref, uri] can't be both present", "reply.channel", "reply.ref", "reply.uri"))
} else if !isDestinationNilOrEmpty(ss.Reply.DeprecatedChannel) {
if fe := ss.Reply.DeprecatedChannel.Validate(ctx); fe != nil {
errs = errs.Also(fe.ViaField("reply.channel"))
}
} else {
if fe := ss.Reply.Destination.Validate(ctx); fe != nil {
errs = errs.Also(fe.ViaField("reply"))
}
}
}

Expand All @@ -71,7 +79,8 @@ func isDestinationNilOrEmpty(d *duckv1beta1.Destination) bool {
}

func isReplyStrategyNilOrEmpty(r *ReplyStrategy) bool {
return r == nil || equality.Semantic.DeepEqual(r, &ReplyStrategy{}) || equality.Semantic.DeepEqual(r.Channel, &duckv1beta1.Destination{})
return r == nil || equality.Semantic.DeepEqual(r, &ReplyStrategy{}) ||
(equality.Semantic.DeepEqual(r.DeprecatedChannel, &duckv1beta1.Destination{}) && (equality.Semantic.DeepEqual(r.Destination, &duckv1beta1.Destination{})))
}

func (s *Subscription) CheckImmutableFields(ctx context.Context, og apis.Immutable) *apis.FieldError {
Expand Down
23 changes: 19 additions & 4 deletions pkg/apis/messaging/v1alpha1/subscription_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func getValidChannelRef() corev1.ObjectReference {

func getValidReplyStrategy() *ReplyStrategy {
return &ReplyStrategy{
Channel: &duckv1beta1.Destination{
Destination: &duckv1beta1.Destination{
DeprecatedName: replyChannelName,
DeprecatedKind: channelKind,
DeprecatedAPIVersion: channelAPIVersion,
Expand Down Expand Up @@ -223,11 +223,11 @@ func TestSubscriptionSpecValidation(t *testing.T) {
return fe
}(),
}, {
name: "missing name in Reply.Ref",
name: "missing name in Reply.DeprecatedChannel.Ref",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Reply: &ReplyStrategy{
Channel: &duckv1beta1.Destination{
DeprecatedChannel: &duckv1beta1.Destination{
DeprecatedKind: channelKind,
DeprecatedAPIVersion: channelAPIVersion,
},
Expand All @@ -237,6 +237,21 @@ func TestSubscriptionSpecValidation(t *testing.T) {
fe := apis.ErrMissingField("reply.channel.name")
return fe
}(),
}, {
name: "missing name in Reply.Ref",
c: &SubscriptionSpec{
Channel: getValidChannelRef(),
Reply: &ReplyStrategy{
Destination: &duckv1beta1.Destination{
DeprecatedKind: channelKind,
DeprecatedAPIVersion: channelAPIVersion,
},
},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("reply.name")
return fe
}(),
}}

for _, test := range tests {
Expand All @@ -257,7 +272,7 @@ func TestSubscriptionImmutable(t *testing.T) {
newSubscriber.Ref.Name = "newSubscriber"

newReply := getValidReplyStrategy()
newReply.Channel.DeprecatedName = "newReplyChannel"
newReply.Destination.DeprecatedName = "newReplyChannel"

tests := []struct {
name string
Expand Down
9 changes: 7 additions & 2 deletions pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/reconciler/parallel/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewFilterSubscription(branchNumber int, p *v1alpha1.Parallel) *v1alpha1.Sub
},
}
r.Spec.Reply = &v1alpha1.ReplyStrategy{
Channel: &duckv1beta1.Destination{
Destination: &duckv1beta1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: p.Spec.ChannelTemplate.APIVersion,
Kind: p.Spec.ChannelTemplate.Kind,
Expand Down Expand Up @@ -94,9 +94,9 @@ func NewSubscription(branchNumber int, p *v1alpha1.Parallel) *v1alpha1.Subscript
}

if p.Spec.Branches[branchNumber].Reply != nil {
r.Spec.Reply = &v1alpha1.ReplyStrategy{Channel: p.Spec.Branches[branchNumber].Reply}
r.Spec.Reply = &v1alpha1.ReplyStrategy{Destination: p.Spec.Branches[branchNumber].Reply}
} else if p.Spec.Reply != nil {
r.Spec.Reply = &v1alpha1.ReplyStrategy{Channel: p.Spec.Reply}
r.Spec.Reply = &v1alpha1.ReplyStrategy{Destination: p.Spec.Reply}
}
return r
}
4 changes: 2 additions & 2 deletions pkg/reconciler/sequence/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewSubscription(stepNumber int, p *v1alpha1.Sequence) *v1alpha1.Subscriptio
// last one, we'll use the (optional) reply from the Sequence Spec.
if stepNumber < len(p.Spec.Steps)-1 {
r.Spec.Reply = &v1alpha1.ReplyStrategy{
Channel: &duckv1beta1.Destination{
Destination: &duckv1beta1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: p.Spec.ChannelTemplate.APIVersion,
Kind: p.Spec.ChannelTemplate.Kind,
Expand All @@ -67,7 +67,7 @@ func NewSubscription(stepNumber int, p *v1alpha1.Sequence) *v1alpha1.Subscriptio
},
}
} else if p.Spec.Reply != nil {
r.Spec.Reply = &v1alpha1.ReplyStrategy{Channel: p.Spec.Reply}
r.Spec.Reply = &v1alpha1.ReplyStrategy{Destination: p.Spec.Reply}
}
return r
}
40 changes: 27 additions & 13 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,30 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc
subscription.Status.PhysicalSubscription.ReplyURI = nil
subscription.Status.ClearDeprecated()
if !isNilOrEmptyReply(reply) {
hasDeprecatedReplyStatus := false
var destination *duckv1beta1.Destination
if reply.DeprecatedChannel != nil && !equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{}) {
destination = reply.DeprecatedChannel
// Add a condition warning that the fields are deprecated.
subscription.Status.MarkReplyDeprecatedRef(replyFieldsDeprecated, "Using deprecated channel field when specifying spec.reply. Update to spec.reply.ref or spec.reply.uri. These will be removed in future release")
hasDeprecatedReplyStatus = true
} else {
destination = reply.Destination
}

// Populate the namespace for the subscriber since it is in the namespace
if reply.Channel.Ref != nil {
reply.Channel.Ref.Namespace = subscription.Namespace
} else if reply.Channel.DeprecatedName != "" {
if destination.Ref != nil {
destination.Ref.Namespace = subscription.Namespace
} else if destination.DeprecatedName != "" {
// Add the check for DeprecatedName, since without that it wouldn't
// have passed validation.
reply.Channel.DeprecatedNamespace = subscription.Namespace
// Add a condition warning that the fields are deprecated.
subscription.Status.MarkReplyDeprecatedRef(replyFieldsDeprecated, "Using deprecated object ref fields when specifying spec.reply. Update to spec.reply.ref. These will be removed in 0.11")
destination.DeprecatedNamespace = subscription.Namespace
if !hasDeprecatedReplyStatus {
// Add a condition warning that the fields are deprecated.
subscription.Status.MarkReplyDeprecatedRef(replyFieldsDeprecated, "Using deprecated object ref fields when specifying spec.reply. Update to spec.reply.ref. These will be removed in 0.11")
}
}
replyURIStr, err := r.destinationResolver.URIFromDestination(*reply.Channel, subscription)
replyURIStr, err := r.destinationResolver.URIFromDestination(*destination, subscription)
if err != nil {
logging.FromContext(ctx).Warn("Failed to resolve reply",
zap.Error(err),
Expand All @@ -241,11 +254,11 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc
}
replyURI, err := apis.ParseURL(replyURIStr)
if err != nil {
logging.FromContext(ctx).Warn("Failed to parse URL for spec.Reply.Channel URL",
logging.FromContext(ctx).Warn("Failed to parse URL for spec.reply URL",
zap.Error(err),
zap.Any("reply.channel", reply.Channel))
r.Recorder.Eventf(subscription, corev1.EventTypeWarning, replyResolveFailed, "Failed to parse URL for spec.reply.channel: %v", err)
subscription.Status.MarkReferencesNotResolved(replyResolveFailed, "Failed to parse URL for spec.reply.channel: %v", err)
zap.Any("reply", destination))
r.Recorder.Eventf(subscription, corev1.EventTypeWarning, replyResolveFailed, "Failed to parse URL for spec.reply: %v", err)
subscription.Status.MarkReferencesNotResolved(replyResolveFailed, "Failed to parse URL for spec.reply: %v", err)
return err
}

Expand Down Expand Up @@ -391,8 +404,9 @@ func (r *Reconciler) validateChannel(ctx context.Context, channel *eventingduckv
return nil
}

func isNilOrEmptyReply(reply *v1alpha1.ReplyStrategy) bool {
return reply == nil || equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{})
func isNilOrEmptyReply(r *v1alpha1.ReplyStrategy) bool {
return r == nil || equality.Semantic.DeepEqual(r, &v1alpha1.ReplyStrategy{}) ||
(equality.Semantic.DeepEqual(r.DeprecatedChannel, &duckv1beta1.Destination{}) && (equality.Semantic.DeepEqual(r.Destination, &duckv1beta1.Destination{})))
}
func isNilOrEmptyDeliveryDeadLetterSink(delivery *eventingduckv1alpha1.DeliverySpec) bool {
return delivery == nil || equality.Semantic.DeepEqual(delivery, &eventingduckv1alpha1.DeliverySpec{}) ||
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/testing/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func WithSubscriptionReferencesNotResolved(reason, msg string) SubscriptionOptio
func WithSubscriptionReply(gvk metav1.GroupVersionKind, name string) SubscriptionOption {
return func(s *v1alpha1.Subscription) {
s.Spec.Reply = &v1alpha1.ReplyStrategy{
Channel: &duckv1beta1.Destination{
Destination: &duckv1beta1.Destination{
DeprecatedAPIVersion: apiVersion(gvk),
DeprecatedKind: gvk.Kind,
DeprecatedName: name,
Expand All @@ -184,7 +184,7 @@ func WithSubscriptionReply(gvk metav1.GroupVersionKind, name string) Subscriptio
func WithSubscriptionReplyNotDeprecated(gvk metav1.GroupVersionKind, name string) SubscriptionOption {
return func(s *v1alpha1.Subscription) {
s.Spec.Reply = &v1alpha1.ReplyStrategy{
Channel: &duckv1beta1.Destination{
Destination: &duckv1beta1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: apiVersion(gvk),
Kind: gvk.Kind,
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/trigger/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewSubscription(t *eventingv1alpha1.Trigger, brokerTrigger, brokerIngress *
URI: tmpURI,
},
Reply: &messagingv1alpha1.ReplyStrategy{
Channel: &duckv1beta1.Destination{
Destination: &duckv1beta1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: brokerIngress.APIVersion,
Kind: brokerIngress.Kind,
Expand Down
2 changes: 1 addition & 1 deletion test/base/resources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func WithReplyForSubscription(name string, typemeta *metav1.TypeMeta) Subscripti
return func(s *messagingv1alpha1.Subscription) {
if name != "" {
s.Spec.Reply = &messagingv1alpha1.ReplyStrategy{
Channel: &duckv1beta1.Destination{
Destination: &duckv1beta1.Destination{
Ref: pkgTest.CoreV1ObjectReference(typemeta.Kind, typemeta.APIVersion, name),
},
}
Expand Down