diff --git a/pkg/apis/flows/v1alpha1/sequence_defaults.go b/pkg/apis/flows/v1alpha1/sequence_defaults.go index d9c9e63f0c4..13dddf59fff 100644 --- a/pkg/apis/flows/v1alpha1/sequence_defaults.go +++ b/pkg/apis/flows/v1alpha1/sequence_defaults.go @@ -46,3 +46,9 @@ func (ss *SequenceSpec) SetDefaults(ctx context.Context) { ss.Reply.SetDefaults(ctx) } } + +func (ss *SequenceStep) SetDefaults(ctx context.Context) { + ss.Subscriber.SetDefaults(ctx) + + // No delivery defaults. +} diff --git a/pkg/apis/flows/v1alpha1/sequence_defaults_test.go b/pkg/apis/flows/v1alpha1/sequence_defaults_test.go index 51053e23da4..be71fa2f2c0 100644 --- a/pkg/apis/flows/v1alpha1/sequence_defaults_test.go +++ b/pkg/apis/flows/v1alpha1/sequence_defaults_test.go @@ -68,9 +68,9 @@ func TestSequenceSetDefaults(t *testing.T) { initial: Sequence{ ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Name: "first"}}, - {Ref: &duckv1.KReference{Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Name: "first"}}}, + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Name: "reply"}, @@ -81,9 +81,9 @@ func TestSequenceSetDefaults(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ ChannelTemplate: defaultChannelTemplate, - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}, - {Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}}, + {Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Namespace: testNS, Name: "reply"}, diff --git a/pkg/apis/flows/v1alpha1/sequence_types.go b/pkg/apis/flows/v1alpha1/sequence_types.go index 1c2cd5bed5f..468226729b2 100644 --- a/pkg/apis/flows/v1alpha1/sequence_types.go +++ b/pkg/apis/flows/v1alpha1/sequence_types.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -66,7 +67,7 @@ var ( type SequenceSpec struct { // Steps is the list of Destinations (processors / functions) that will be called in the order // provided. - Steps []duckv1.Destination `json:"steps"` + Steps []SequenceStep `json:"steps"` // ChannelTemplate specifies which Channel CRD to use. If left unspecified, it is set to the default Channel CRD // for the namespace (or cluster, in case there are no defaults for the namespace). @@ -78,6 +79,17 @@ type SequenceSpec struct { Reply *duckv1.Destination `json:"reply,omitempty"` } +type SequenceStep struct { + // Subscriber receiving the step event + Subscriber duckv1.Destination `json:",inline"` + + // Delivery is the delivery specification for events to the subscriber + // This includes things like retries, DLQ, etc. + // Needed for Roundtripping v1alpha1 <-> v1beta1. + // +optional + Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"` +} + type SequenceChannelStatus struct { // Channel is the reference to the underlying channel. Channel corev1.ObjectReference `json:"channel"` diff --git a/pkg/apis/flows/v1alpha1/sequence_validation.go b/pkg/apis/flows/v1alpha1/sequence_validation.go index 9f7c8c4b47f..2c8783e5e09 100644 --- a/pkg/apis/flows/v1alpha1/sequence_validation.go +++ b/pkg/apis/flows/v1alpha1/sequence_validation.go @@ -58,3 +58,9 @@ func (ps *SequenceSpec) Validate(ctx context.Context) *apis.FieldError { return errs } + +func (ss *SequenceStep) Validate(ctx context.Context) *apis.FieldError { + errs := ss.Subscriber.Validate(ctx) + + return errs +} diff --git a/pkg/apis/flows/v1alpha1/sequence_validation_test.go b/pkg/apis/flows/v1alpha1/sequence_validation_test.go index 48b9a5554c9..9367d878673 100644 --- a/pkg/apis/flows/v1alpha1/sequence_validation_test.go +++ b/pkg/apis/flows/v1alpha1/sequence_validation_test.go @@ -98,7 +98,7 @@ func TestSequenceSpecValidation(t *testing.T) { }, { name: "missing channeltemplatespec", ts: &SequenceSpec{ - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("channelTemplate") @@ -108,7 +108,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "invalid channeltemplatespec missing APIVersion", ts: &SequenceSpec{ ChannelTemplate: &eventingduck.ChannelTemplateSpec{TypeMeta: metav1.TypeMeta{Kind: "mykind"}, Spec: &runtime.RawExtension{}}, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("channelTemplate.apiVersion") @@ -118,7 +118,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "invalid channeltemplatespec missing Kind", ts: &SequenceSpec{ ChannelTemplate: &eventingduck.ChannelTemplateSpec{TypeMeta: metav1.TypeMeta{APIVersion: "myapiversion"}, Spec: &runtime.RawExtension{}}, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { fe := apis.ErrMissingField("channelTemplate.kind") @@ -128,7 +128,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, }, want: func() *apis.FieldError { return nil @@ -137,7 +137,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence with valid reply", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, Reply: makeValidReply("reply-channel"), }, want: func() *apis.FieldError { @@ -147,7 +147,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence with invalid missing name", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{ Namespace: "namespace", @@ -164,7 +164,7 @@ func TestSequenceSpecValidation(t *testing.T) { name: "valid sequence with invalid reply", ts: &SequenceSpec{ ChannelTemplate: validChannelTemplate, - Steps: []duckv1.Destination{{URI: subscriberURI}}, + Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}}, Reply: makeInvalidReply("reply-channel"), }, want: func() *apis.FieldError { diff --git a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go index 47c3117ce26..6be9d56e8a7 100644 --- a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1alpha1 import ( runtime "k8s.io/apimachinery/pkg/runtime" duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + v1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" v1 "knative.dev/pkg/apis/duck/v1" ) @@ -312,7 +313,7 @@ func (in *SequenceSpec) DeepCopyInto(out *SequenceSpec) { *out = *in if in.Steps != nil { in, out := &in.Steps, &out.Steps - *out = make([]v1.Destination, len(*in)) + *out = make([]SequenceStep, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -372,6 +373,28 @@ func (in *SequenceStatus) DeepCopy() *SequenceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SequenceStep) DeepCopyInto(out *SequenceStep) { + *out = *in + in.Subscriber.DeepCopyInto(&out.Subscriber) + if in.Delivery != nil { + in, out := &in.Delivery, &out.Delivery + *out = new(v1beta1.DeliverySpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SequenceStep. +func (in *SequenceStep) DeepCopy() *SequenceStep { + if in == nil { + return nil + } + out := new(SequenceStep) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SequenceSubscriptionStatus) DeepCopyInto(out *SequenceSubscriptionStatus) { *out = *in diff --git a/pkg/apis/flows/v1beta1/sequence_defaults.go b/pkg/apis/flows/v1beta1/sequence_defaults.go index 260bccb44f0..34b7f46863e 100644 --- a/pkg/apis/flows/v1beta1/sequence_defaults.go +++ b/pkg/apis/flows/v1beta1/sequence_defaults.go @@ -46,3 +46,9 @@ func (ss *SequenceSpec) SetDefaults(ctx context.Context) { ss.Reply.SetDefaults(ctx) } } + +func (ss *SequenceStep) SetDefaults(ctx context.Context) { + ss.Subscriber.SetDefaults(ctx) + + // No delivery defaults. +} diff --git a/pkg/apis/flows/v1beta1/sequence_defaults_test.go b/pkg/apis/flows/v1beta1/sequence_defaults_test.go index 5882c30c2d8..9d7f574533c 100644 --- a/pkg/apis/flows/v1beta1/sequence_defaults_test.go +++ b/pkg/apis/flows/v1beta1/sequence_defaults_test.go @@ -68,9 +68,11 @@ func TestSequenceSetDefaults(t *testing.T) { initial: Sequence{ ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Name: "first"}}, - {Ref: &duckv1.KReference{Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Name: "first"}}}, + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Name: "reply"}, @@ -81,9 +83,11 @@ func TestSequenceSetDefaults(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: testNS}, Spec: SequenceSpec{ ChannelTemplate: defaultChannelTemplate, - Steps: []duckv1.Destination{ - {Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}, - {Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}, + Steps: []SequenceStep{ + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}}, + {Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}}, }, Reply: &duckv1.Destination{ Ref: &duckv1.KReference{Namespace: testNS, Name: "reply"}, diff --git a/pkg/apis/flows/v1beta1/sequence_types.go b/pkg/apis/flows/v1beta1/sequence_types.go index 60a637f9ae9..2745467c422 100644 --- a/pkg/apis/flows/v1beta1/sequence_types.go +++ b/pkg/apis/flows/v1beta1/sequence_types.go @@ -21,6 +21,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -64,8 +65,8 @@ var ( type SequenceSpec struct { // Steps is the list of Destinations (processors / functions) that will be called in the order - // provided. - Steps []duckv1.Destination `json:"steps"` + // provided. Each step has its own delivery options + Steps []SequenceStep `json:"steps"` // ChannelTemplate specifies which Channel CRD to use. If left unspecified, it is set to the default Channel CRD // for the namespace (or cluster, in case there are no defaults for the namespace). @@ -77,6 +78,16 @@ type SequenceSpec struct { Reply *duckv1.Destination `json:"reply,omitempty"` } +type SequenceStep struct { + // Subscriber receiving the step event + Subscriber duckv1.Destination `json:",inline"` + + // Delivery is the delivery specification for events to the subscriber + // This includes things like retries, DLQ, etc. + // +optional + Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"` +} + type SequenceChannelStatus struct { // Channel is the reference to the underlying channel. Channel corev1.ObjectReference `json:"channel"` diff --git a/pkg/apis/flows/v1beta1/sequence_validation.go b/pkg/apis/flows/v1beta1/sequence_validation.go index ac91f63d1cd..c0a047e03a9 100644 --- a/pkg/apis/flows/v1beta1/sequence_validation.go +++ b/pkg/apis/flows/v1beta1/sequence_validation.go @@ -58,3 +58,15 @@ func (ps *SequenceSpec) Validate(ctx context.Context) *apis.FieldError { return errs } + +func (ss *SequenceStep) Validate(ctx context.Context) *apis.FieldError { + errs := ss.Subscriber.Validate(ctx) + + if ss.Delivery != nil { + if de := ss.Delivery.Validate(ctx); de != nil { + errs = errs.Also(de.ViaField("delivery")) + } + } + + return errs +} diff --git a/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go b/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go index 935f90a4787..7282fc9f9c6 100644 --- a/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go +++ b/pkg/apis/flows/v1beta1/zz_generated.deepcopy.go @@ -22,6 +22,7 @@ package v1beta1 import ( runtime "k8s.io/apimachinery/pkg/runtime" + duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" v1 "knative.dev/pkg/apis/duck/v1" ) @@ -312,7 +313,7 @@ func (in *SequenceSpec) DeepCopyInto(out *SequenceSpec) { *out = *in if in.Steps != nil { in, out := &in.Steps, &out.Steps - *out = make([]v1.Destination, len(*in)) + *out = make([]SequenceStep, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } @@ -372,6 +373,28 @@ func (in *SequenceStatus) DeepCopy() *SequenceStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SequenceStep) DeepCopyInto(out *SequenceStep) { + *out = *in + in.Subscriber.DeepCopyInto(&out.Subscriber) + if in.Delivery != nil { + in, out := &in.Delivery, &out.Delivery + *out = new(duckv1beta1.DeliverySpec) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SequenceStep. +func (in *SequenceStep) DeepCopy() *SequenceStep { + if in == nil { + return nil + } + out := new(SequenceStep) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SequenceSubscriptionStatus) DeepCopyInto(out *SequenceSubscriptionStatus) { *out = *in diff --git a/pkg/reconciler/sequence/resources/subscription.go b/pkg/reconciler/sequence/resources/subscription.go index e401d7e294c..d46ece44f5d 100644 --- a/pkg/reconciler/sequence/resources/subscription.go +++ b/pkg/reconciler/sequence/resources/subscription.go @@ -53,8 +53,8 @@ func NewSubscription(stepNumber int, s *v1alpha1.Sequence) *messagingv1alpha1.Su Name: SequenceChannelName(s.Name, stepNumber), }, Subscriber: &duckv1.Destination{ - Ref: s.Spec.Steps[stepNumber].Ref, - URI: s.Spec.Steps[stepNumber].URI, + Ref: s.Spec.Steps[stepNumber].Subscriber.Ref, + URI: s.Spec.Steps[stepNumber].Subscriber.URI, }, }, } diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index 0c748289cb2..1f07fdbad34 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -140,20 +140,23 @@ func TestAllCases(t *testing.T) { reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), }, WantCreates: []runtime.Object{ createChannel(sequenceName, 0), - resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))), + resources.NewSubscription(0, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), @@ -192,7 +195,7 @@ func TestAllCases(t *testing.T) { reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), @@ -202,13 +205,13 @@ func TestAllCases(t *testing.T) { resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), @@ -248,10 +251,10 @@ func TestAllCases(t *testing.T) { reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceGeneration(sequenceGeneration), reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), @@ -260,20 +263,33 @@ func TestAllCases(t *testing.T) { createChannel(sequenceName, 0), createChannel(sequenceName, 1), createChannel(sequenceName, 2), - resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), - resources.NewSubscription(1, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), - resources.NewSubscription(2, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)})))}, + resources.NewSubscription(0, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))), + resources.NewSubscription(1, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, {Subscriber: createDestination(1)}, {Subscriber: createDestination(2)}}))), + resources.NewSubscription(2, + reconciletesting.NewFlowsSequence(sequenceName, testNS, + reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, {Subscriber: createDestination(1)}, {Subscriber: createDestination(2)}})))}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceGeneration(sequenceGeneration), reconciletesting.WithFlowsSequenceStatusObservedGeneration(sequenceGeneration), reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2), - }), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), @@ -356,10 +372,10 @@ func TestAllCases(t *testing.T) { reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2)}))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))}, WantErr: false, WantEvents: []string{ Eventf(corev1.EventTypeNormal, "SequenceReconciled", `Sequence reconciled: "test-namespace/test-sequence"`), @@ -371,25 +387,33 @@ func TestAllCases(t *testing.T) { resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))), resources.NewSubscription(1, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}))), resources.NewSubscription(2, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0), createDestination(1), createDestination(2)})))}, + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}})))}, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceReply(createReplyChannel(replyChannelName)), reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{ - createDestination(0), - createDestination(1), - createDestination(2), - }), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{ + {Subscriber: createDestination(0)}, + {Subscriber: createDestination(1)}, + {Subscriber: createDestination(2)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), @@ -472,11 +496,11 @@ func TestAllCases(t *testing.T) { reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(1)})), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(1)}})), createChannel(sequenceName, 0), resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(0)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(0)}}))), }, WantErr: false, WantEvents: []string{ @@ -488,13 +512,13 @@ func TestAllCases(t *testing.T) { WantCreates: []runtime.Object{ resources.NewSubscription(0, reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(1)}))), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(1)}}))), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: reconciletesting.NewFlowsSequence(sequenceName, testNS, reconciletesting.WithInitFlowsSequenceConditions, reconciletesting.WithFlowsSequenceChannelTemplateSpec(imc), - reconciletesting.WithFlowsSequenceSteps([]duckv1.Destination{createDestination(1)}), + reconciletesting.WithFlowsSequenceSteps([]v1alpha1.SequenceStep{{Subscriber: createDestination(1)}}), reconciletesting.WithFlowsSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), reconciletesting.WithFlowsSequenceAddressableNotReady("emptyAddress", "addressable is nil"), reconciletesting.WithFlowsSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), diff --git a/pkg/reconciler/testing/sequence.go b/pkg/reconciler/testing/sequence.go index 5ec0e9ec11e..26f1108688f 100644 --- a/pkg/reconciler/testing/sequence.go +++ b/pkg/reconciler/testing/sequence.go @@ -72,7 +72,7 @@ func WithFlowsSequenceChannelTemplateSpec(cts *eventingduckv1alpha1.ChannelTempl } } -func WithFlowsSequenceSteps(steps []duckv1.Destination) FlowsSequenceOption { +func WithFlowsSequenceSteps(steps []v1alpha1.SequenceStep) FlowsSequenceOption { return func(p *v1alpha1.Sequence) { p.Spec.Steps = steps } diff --git a/test/e2e/sequence_test.go b/test/e2e/sequence_test.go index c94878b53da..905341c05c6 100644 --- a/test/e2e/sequence_test.go +++ b/test/e2e/sequence_test.go @@ -29,6 +29,7 @@ import ( "knative.dev/eventing/test/lib/resources" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/eventing/pkg/apis/flows/v1alpha1" eventingtesting "knative.dev/eventing/pkg/reconciler/testing" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -62,7 +63,7 @@ func TestFlowsSequence(t *testing.T) { defer tearDown(client) // construct steps for the sequence - steps := make([]duckv1.Destination, 0) + steps := make([]v1alpha1.SequenceStep, 0) for _, config := range stepSubscriberConfigs { // create a stepper Pod with Service podName := config.podName @@ -71,9 +72,10 @@ func TestFlowsSequence(t *testing.T) { client.CreatePodOrFail(stepperPod, lib.WithService(podName)) // create a new step - step := duckv1.Destination{ - Ref: resources.KnativeRefForService(podName, client.Namespace), - } + step := v1alpha1.SequenceStep{ + Subscriber: duckv1.Destination{ + Ref: resources.KnativeRefForService(podName, client.Namespace), + }} // add the step into steps steps = append(steps, step) }