From 7becbeeec6290c08eabc4c1edb50b2f5bc4ca877 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Wed, 19 Feb 2020 16:45:55 -0500 Subject: [PATCH 1/4] add delivery options to sequence v1beta1 --- pkg/apis/flows/v1beta1/sequence_defaults.go | 6 +++++ .../flows/v1beta1/sequence_defaults_test.go | 16 +++++++----- pkg/apis/flows/v1beta1/sequence_types.go | 15 +++++++++-- pkg/apis/flows/v1beta1/sequence_validation.go | 12 +++++++++ .../flows/v1beta1/zz_generated.deepcopy.go | 25 ++++++++++++++++++- 5 files changed, 65 insertions(+), 9 deletions(-) diff --git a/pkg/apis/flows/v1beta1/sequence_defaults.go b/pkg/apis/flows/v1beta1/sequence_defaults.go index 260bccb44f0..34b7f46863e 100644 --- a/pkg/apis/flows/v1beta1/sequence_defaults.go +++ b/pkg/apis/flows/v1beta1/sequence_defaults.go @@ -46,3 +46,9 @@ func (ss *SequenceSpec) SetDefaults(ctx context.Context) { ss.Reply.SetDefaults(ctx) } } + +func (ss *SequenceStep) SetDefaults(ctx context.Context) { + ss.Subscriber.SetDefaults(ctx) + + // No delivery defaults. +} diff --git a/pkg/apis/flows/v1beta1/sequence_defaults_test.go b/pkg/apis/flows/v1beta1/sequence_defaults_test.go index 5882c30c2d8..9d7f574533c 100644 --- a/pkg/apis/flows/v1beta1/sequence_defaults_test.go +++ b/pkg/apis/flows/v1beta1/sequence_defaults_test.go @@ -68,9 +68,11 @@ func TestSequenceSetDefaults(t *testing.T) { initial: Sequence{ ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Name: "first"}}, - {Ref: &duckv1.KReference{Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Name: "first"}}}, + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Name: "reply"}, @@ -81,9 +83,11 @@ func TestSequenceSetDefaults(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ ChannelTemplate: defaultChannelTemplate, - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}, - {Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}}, + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Namespace: testNS, Name: "reply"}, diff --git a/pkg/apis/flows/v1beta1/sequence_types.go b/pkg/apis/flows/v1beta1/sequence_types.go index 60a637f9ae9..cf8be7127b5 100644 --- a/pkg/apis/flows/v1beta1/sequence_types.go +++ b/pkg/apis/flows/v1beta1/sequence_types.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -64,8 +65,8 @@ var ( type SequenceSpec struct { // Steps is the list of Destinations (processors / functions) that will be called in the order - // provided. - Steps []duckv1.Destination `json:"steps"` + // provided. Each step has its own delivery options + Steps []SequenceStep `json:"steps"` // ChannelTemplate specifies which Channel CRD to use. If left unspecified, it is set to the default Channel CRD // for the namespace (or cluster, in case there are no defaults for the namespace). @@ -77,6 +78,16 @@ type SequenceSpec struct { Reply *duckv1.Destination `json:"reply,omitempty"` } +type SequenceStep struct { + // Subscriber receiving the step event + Subscriber duckv1.Destination `json:"subscriber"` + + // Delivery is the delivery specification for events to the subscriber + // This includes things like retries, DLQ, etc. + // +optional + Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"` +} + type SequenceChannelStatus struct { // Channel is the reference to the underlying channel. Channel corev1.ObjectReference `json:"channel"` diff --git a/pkg/apis/flows/v1beta1/sequence_validation.go b/pkg/apis/flows/v1beta1/sequence_validation.go index ac91f63d1cd..c0a047e03a9 100644 --- a/pkg/apis/flows/v1beta1/sequence_validation.go +++ b/pkg/apis/flows/v1beta1/sequence_validation.go @@ -58,3 +58,15 @@ func (ps *SequenceSpec) Validate(ctx context.Context) *apis.FieldError { return errs } + +func (ss *SequenceStep) Validate(ctx context.Context) *apis.FieldError { + errs := ss.Subscriber.Validate(ctx) + + if ss.Delivery != nil { + if de := ss.Delivery.Validate(ctx); de != nil { + errs = errs.Also(de.ViaField("delivery")) + } + } + + return errs +} diff --git a/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go b/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go index 935f90a4787..7282fc9f9c6 100644 --- a/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ package v1beta1 import ( runtime "k8s.io/apimachinery/pkg/runtime" + duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" v1 "knative.dev/pkg/apis/duck/v1" ) @@ -312,7 +313,7 @@ func (in *SequenceSpec) DeepCopyInto(out *SequenceSpec) { *out = *in if in.Steps != nil { in, out := &in.Steps, &out.Steps - *out = make([]v1.Destination, len(*in)) + *out = make([]SequenceStep, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -372,6 +373,28 @@ func (in *SequenceStatus) DeepCopy() *SequenceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SequenceStep) DeepCopyInto(out *SequenceStep) { + *out = *in + in.Subscriber.DeepCopyInto(&out.Subscriber) + if in.Delivery != nil { + in, out := &in.Delivery, &out.Delivery + *out = new(duckv1beta1.DeliverySpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SequenceStep. +func (in *SequenceStep) DeepCopy() *SequenceStep { + if in == nil { + return nil + } + out := new(SequenceStep) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SequenceSubscriptionStatus) DeepCopyInto(out *SequenceSubscriptionStatus) { *out = *in From f5842d246e42181ad54db8b60ad9bec1bdf468ed Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 20 Feb 2020 09:41:12 -0500 Subject: [PATCH 2/4] add delivery to v1alpha1 for roundtripping purpose --- pkg/apis/flows/v1alpha1/sequence_defaults.go | 6 ++ .../flows/v1alpha1/sequence_defaults_test.go | 12 +-- pkg/apis/flows/v1alpha1/sequence_types.go | 14 ++- .../flows/v1alpha1/sequence_validation.go | 12 +++ .../v1alpha1/sequence_validation_test.go | 14 +-- .../flows/v1alpha1/zz_generated.deepcopy.go | 25 ++++- pkg/apis/flows/v1beta1/sequence_types.go | 2 +- .../sequence/resources/subscription.go | 4 +- pkg/reconciler/sequence/sequence_test.go | 92 ++++++++++++------- pkg/reconciler/testing/sequence.go | 2 +- test/e2e/sequence_test.go | 10 +- 11 files changed, 136 insertions(+), 57 deletions(-) diff --git a/pkg/apis/flows/v1alpha1/sequence_defaults.go b/pkg/apis/flows/v1alpha1/sequence_defaults.go index d9c9e63f0c4..13dddf59fff 100644 --- a/pkg/apis/flows/v1alpha1/sequence_defaults.go +++ b/pkg/apis/flows/v1alpha1/sequence_defaults.go @@ -46,3 +46,9 @@ func (ss *SequenceSpec) SetDefaults(ctx context.Context) { ss.Reply.SetDefaults(ctx) } } + +func (ss *SequenceStep) SetDefaults(ctx context.Context) { + ss.Subscriber.SetDefaults(ctx) + + // No delivery defaults. +} diff --git a/pkg/apis/flows/v1alpha1/sequence_defaults_test.go b/pkg/apis/flows/v1alpha1/sequence_defaults_test.go index 51053e23da4..be71fa2f2c0 100644 --- a/pkg/apis/flows/v1alpha1/sequence_defaults_test.go +++ b/pkg/apis/flows/v1alpha1/sequence_defaults_test.go @@ -68,9 +68,9 @@ func TestSequenceSetDefaults(t *testing.T) { initial: Sequence{ ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Name: "first"}}, - {Ref: &duckv1.KReference{Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Name: "first"}}}, + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Name: "reply"}, @@ -81,9 +81,9 @@ func TestSequenceSetDefaults(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ ChannelTemplate: defaultChannelTemplate, - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}, - {Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}}, + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Namespace: testNS, Name: "reply"}, diff --git a/pkg/apis/flows/v1alpha1/sequence_types.go b/pkg/apis/flows/v1alpha1/sequence_types.go index 1c2cd5bed5f..468226729b2 100644 --- a/pkg/apis/flows/v1alpha1/sequence_types.go +++ b/pkg/apis/flows/v1alpha1/sequence_types.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -66,7 +67,7 @@ var ( type SequenceSpec struct { // Steps is the list of Destinations (processors / functions) that will be called in the order // provided. - Steps []duckv1.Destination `json:"steps"` + Steps []SequenceStep `json:"steps"` // ChannelTemplate specifies which Channel CRD to use. If left unspecified, it is set to the default Channel CRD // for the namespace (or cluster, in case there are no defaults for the namespace). @@ -78,6 +79,17 @@ type SequenceSpec struct { Reply *duckv1.Destination `json:"reply,omitempty"` } +type SequenceStep struct { + // Subscriber receiving the step event + Subscriber duckv1.Destination `json:",inline"` + + // Delivery is the delivery specification for events to the subscriber + // This includes things like retries, DLQ, etc. + // Needed for Roundtripping v1alpha1 <-> v1beta1. + // +optional + Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"` +} + type SequenceChannelStatus struct { // Channel is the reference to the underlying channel. Channel corev1.ObjectReference `json:"channel"` diff --git a/pkg/apis/flows/v1alpha1/sequence_validation.go b/pkg/apis/flows/v1alpha1/sequence_validation.go index 9f7c8c4b47f..b7e37f50fc3 100644 --- a/pkg/apis/flows/v1alpha1/sequence_validation.go +++ b/pkg/apis/flows/v1alpha1/sequence_validation.go @@ -58,3 +58,15 @@ func (ps *SequenceSpec) Validate(ctx context.Context) *apis.FieldError { return errs } + +func (ss *SequenceStep) Validate(ctx context.Context) *apis.FieldError { + errs := ss.Subscriber.Validate(ctx) + + if ss.Delivery != nil { + if de := ss.Delivery.Validate(ctx); de != nil { + errs = errs.Also(de.ViaField("delivery")) + } + } + + return errs +} diff --git a/pkg/apis/flows/v1alpha1/sequence_validation_test.go b/pkg/apis/flows/v1alpha1/sequence_validation_test.go index 48b9a5554c9..9367d878673 100644 --- a/pkg/apis/flows/v1alpha1/sequence_validation_test.go +++ b/pkg/apis/flows/v1alpha1/sequence_validation_test.go @@ -98,7 +98,7 @@ func TestSequenceSpecValidation(t *testing.T) { }, { name: "missing channeltemplatespec", ts: &SequenceSpec{ - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("channelTemplate") @@ -108,7 +108,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "invalid channeltemplatespec missing APIVersion", ts: &SequenceSpec{ ChannelTemplate: &eventingduck.ChannelTemplateSpec{TypeMeta: metav1.TypeMeta{Kind: "mykind"}, Spec: &runtime.RawExtension{}}, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("channelTemplate.apiVersion") @@ -118,7 +118,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "invalid channeltemplatespec missing Kind", ts: &SequenceSpec{ ChannelTemplate: &eventingduck.ChannelTemplateSpec{TypeMeta: metav1.TypeMeta{APIVersion: "myapiversion"}, Spec: &runtime.RawExtension{}}, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("channelTemplate.kind") @@ -128,7 +128,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { return nil @@ -137,7 +137,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence with valid reply", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, Reply: makeValidReply("reply-channel"), }, want: func() *apis.FieldError { @@ -147,7 +147,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence with invalid missing name", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{ Namespace: "namespace", @@ -164,7 +164,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence with invalid reply", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, Reply: makeInvalidReply("reply-channel"), }, want: func() *apis.FieldError { diff --git a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go index 47c3117ce26..6be9d56e8a7 100644 --- a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1alpha1 import ( runtime "k8s.io/apimachinery/pkg/runtime" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + v1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" v1 "knative.dev/pkg/apis/duck/v1" ) @@ -312,7 +313,7 @@ func (in *SequenceSpec) DeepCopyInto(out *SequenceSpec) { *out = *in if in.Steps != nil { in, out := &in.Steps, &out.Steps - *out = make([]v1.Destination, len(*in)) + *out = make([]SequenceStep, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -372,6 +373,28 @@ func (in *SequenceStatus) DeepCopy() *SequenceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SequenceStep) DeepCopyInto(out *SequenceStep) { + *out = *in + in.Subscriber.DeepCopyInto(&out.Subscriber) + if in.Delivery != nil { + in, out := &in.Delivery, &out.Delivery + *out = new(v1beta1.DeliverySpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SequenceStep. +func (in *SequenceStep) DeepCopy() *SequenceStep { + if in == nil { + return nil + } + out := new(SequenceStep) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SequenceSubscriptionStatus) DeepCopyInto(out *SequenceSubscriptionStatus) { *out = *in diff --git a/pkg/apis/flows/v1beta1/sequence_types.go b/pkg/apis/flows/v1beta1/sequence_types.go index cf8be7127b5..2745467c422 100644 --- a/pkg/apis/flows/v1beta1/sequence_types.go +++ b/pkg/apis/flows/v1beta1/sequence_types.go @@ -80,7 +80,7 @@ type SequenceSpec struct { type SequenceStep struct { // Subscriber receiving the step event - Subscriber duckv1.Destination `json:"subscriber"` + Subscriber duckv1.Destination `json:",inline"` // Delivery is the delivery specification for events to the subscriber // This includes things like retries, DLQ, etc. diff --git a/pkg/reconciler/sequence/resources/subscription.go b/pkg/reconciler/sequence/resources/subscription.go index e401d7e294c..d46ece44f5d 100644 --- a/pkg/reconciler/sequence/resources/subscription.go +++ b/pkg/reconciler/sequence/resources/subscription.go @@ -53,8 +53,8 @@ func NewSubscription(stepNumber int, s *v1alpha1.Sequence) *messagingv1alpha1.Su Name: SequenceChannelName(s.Name, stepNumber), }, Subscriber: &duckv1.Destination{ - Ref: s.Spec.Steps[stepNumber].Ref, - URI: s.Spec.Steps[stepNumber].URI, + Ref: s.Spec.Steps[stepNumber].Subscriber.Ref, + URI: s.Spec.Steps[stepNumber].Subscriber.URI, }, }, } diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index 0c748289cb2..1f07fdbad34 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -140,20 +140,23 @@ func TestAllCases(t *testing.T) { reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), }, WantCreates: []runtime.Object{ createChannel(sequenceName, 0), - resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))), + resources.NewSubscription(0, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), @@ -192,7 +195,7 @@ func TestAllCases(t *testing.T) { reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), @@ -202,13 +205,13 @@ func TestAllCases(t *testing.T) { resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), @@ -248,10 +251,10 @@ func TestAllCases(t *testing.T) { reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceGeneration(sequenceGeneration), reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), @@ -260,20 +263,33 @@ func TestAllCases(t *testing.T) { createChannel(sequenceName, 0), createChannel(sequenceName, 1), createChannel(sequenceName, 2), - resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), - resources.NewSubscription(1, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), - resources.NewSubscription(2, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)})))}, + resources.NewSubscription(0, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))), + resources.NewSubscription(1, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, {Subscriber: createDestination(1)}, {Subscriber: createDestination(2)}}))), + resources.NewSubscription(2, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, {Subscriber: createDestination(1)}, {Subscriber: createDestination(2)}})))}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceGeneration(sequenceGeneration), reconciletesting.WithFlowsSequenceStatusObservedGeneration(sequenceGeneration), reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2), - }), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), @@ -356,10 +372,10 @@ func TestAllCases(t *testing.T) { reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), @@ -371,25 +387,33 @@ func TestAllCases(t *testing.T) { resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))), resources.NewSubscription(1, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))), resources.NewSubscription(2, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)})))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}})))}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2), - }), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), @@ -472,11 +496,11 @@ func TestAllCases(t *testing.T) { reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(1)})), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(1)}})), createChannel(sequenceName, 0), resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))), }, WantErr: false, WantEvents: []string{ @@ -488,13 +512,13 @@ func TestAllCases(t *testing.T) { WantCreates: []runtime.Object{ resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(1)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(1)}}))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(1)}), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(1)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), diff --git a/pkg/reconciler/testing/sequence.go b/pkg/reconciler/testing/sequence.go index 5ec0e9ec11e..26f1108688f 100644 --- a/pkg/reconciler/testing/sequence.go +++ b/pkg/reconciler/testing/sequence.go @@ -72,7 +72,7 @@ func WithFlowsSequenceChannelTemplateSpec(cts *eventingduckv1alpha1.ChannelTempl } } -func WithFlowsSequenceSteps(steps []duckv1.Destination) FlowsSequenceOption { +func WithFlowsSequenceSteps(steps []v1alpha1.SequenceStep) FlowsSequenceOption { return func(p *v1alpha1.Sequence) { p.Spec.Steps = steps } diff --git a/test/e2e/sequence_test.go b/test/e2e/sequence_test.go index c94878b53da..905341c05c6 100644 --- a/test/e2e/sequence_test.go +++ b/test/e2e/sequence_test.go @@ -29,6 +29,7 @@ import ( "knative.dev/eventing/test/lib/resources" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/eventing/pkg/apis/flows/v1alpha1" eventingtesting "knative.dev/eventing/pkg/reconciler/testing" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -62,7 +63,7 @@ func TestFlowsSequence(t *testing.T) { defer tearDown(client) // construct steps for the sequence - steps := make([]duckv1.Destination, 0) + steps := make([]v1alpha1.SequenceStep, 0) for _, config := range stepSubscriberConfigs { // create a stepper Pod with Service podName := config.podName @@ -71,9 +72,10 @@ func TestFlowsSequence(t *testing.T) { client.CreatePodOrFail(stepperPod, lib.WithService(podName)) // create a new step - step := duckv1.Destination{ - Ref: resources.KnativeRefForService(podName, client.Namespace), - } + step := v1alpha1.SequenceStep{ + Subscriber: duckv1.Destination{ + Ref: resources.KnativeRefForService(podName, client.Namespace), + }} // add the step into steps steps = append(steps, step) } From 2c579f5926e466f7b26fae8f9a6ded61341586ae Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 20 Feb 2020 10:15:57 -0500 Subject: [PATCH 3/4] use v1alpha1 deliveryspec --- pkg/apis/flows/v1alpha1/sequence_types.go | 3 +-- pkg/apis/flows/v1alpha1/sequence_validation.go | 6 ------ pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go | 3 +-- 3 files changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/apis/flows/v1alpha1/sequence_types.go b/pkg/apis/flows/v1alpha1/sequence_types.go index 468226729b2..8652bbe53a8 100644 --- a/pkg/apis/flows/v1alpha1/sequence_types.go +++ b/pkg/apis/flows/v1alpha1/sequence_types.go @@ -22,7 +22,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" - eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -87,7 +86,7 @@ type SequenceStep struct { // This includes things like retries, DLQ, etc. // Needed for Roundtripping v1alpha1 <-> v1beta1. // +optional - Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"` + Delivery *eventingduckv1alpha1.DeliverySpec `json:"delivery,omitempty"` } type SequenceChannelStatus struct { diff --git a/pkg/apis/flows/v1alpha1/sequence_validation.go b/pkg/apis/flows/v1alpha1/sequence_validation.go index b7e37f50fc3..2c8783e5e09 100644 --- a/pkg/apis/flows/v1alpha1/sequence_validation.go +++ b/pkg/apis/flows/v1alpha1/sequence_validation.go @@ -62,11 +62,5 @@ func (ps *SequenceSpec) Validate(ctx context.Context) *apis.FieldError { func (ss *SequenceStep) Validate(ctx context.Context) *apis.FieldError { errs := ss.Subscriber.Validate(ctx) - if ss.Delivery != nil { - if de := ss.Delivery.Validate(ctx); de != nil { - errs = errs.Also(de.ViaField("delivery")) - } - } - return errs } diff --git a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go index 6be9d56e8a7..569c1396c8c 100644 --- a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go @@ -23,7 +23,6 @@ package v1alpha1 import ( runtime "k8s.io/apimachinery/pkg/runtime" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" - v1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" v1 "knative.dev/pkg/apis/duck/v1" ) @@ -379,7 +378,7 @@ func (in *SequenceStep) DeepCopyInto(out *SequenceStep) { in.Subscriber.DeepCopyInto(&out.Subscriber) if in.Delivery != nil { in, out := &in.Delivery, &out.Delivery - *out = new(v1beta1.DeliverySpec) + *out = new(duckv1alpha1.DeliverySpec) (*in).DeepCopyInto(*out) } return From 73f9fdefdc95de00c5ae55874fedb1f9268835d6 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 20 Feb 2020 12:47:20 -0500 Subject: [PATCH 4/4] use v1beta1 DeliverySpec after all --- pkg/apis/flows/v1alpha1/sequence_types.go | 3 ++- pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/apis/flows/v1alpha1/sequence_types.go b/pkg/apis/flows/v1alpha1/sequence_types.go index 8652bbe53a8..468226729b2 100644 --- a/pkg/apis/flows/v1alpha1/sequence_types.go +++ b/pkg/apis/flows/v1alpha1/sequence_types.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -86,7 +87,7 @@ type SequenceStep struct { // This includes things like retries, DLQ, etc. // Needed for Roundtripping v1alpha1 <-> v1beta1. // +optional - Delivery *eventingduckv1alpha1.DeliverySpec `json:"delivery,omitempty"` + Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"` } type SequenceChannelStatus struct { diff --git a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go index 569c1396c8c..6be9d56e8a7 100644 --- a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1alpha1 import ( runtime "k8s.io/apimachinery/pkg/runtime" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + v1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" v1 "knative.dev/pkg/apis/duck/v1" ) @@ -378,7 +379,7 @@ func (in *SequenceStep) DeepCopyInto(out *SequenceStep) { in.Subscriber.DeepCopyInto(&out.Subscriber) if in.Delivery != nil { in, out := &in.Delivery, &out.Delivery - *out = new(duckv1alpha1.DeliverySpec) + *out = new(v1beta1.DeliverySpec) (*in).DeepCopyInto(*out) } return