Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/apis/flows/v1alpha1/sequence_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ func (ss *SequenceSpec) SetDefaults(ctx context.Context) {
ss.Reply.SetDefaults(ctx)
}
}

func (ss *SequenceStep) SetDefaults(ctx context.Context) {
ss.Subscriber.SetDefaults(ctx)

// No delivery defaults.
}
12 changes: 6 additions & 6 deletions pkg/apis/flows/v1alpha1/sequence_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func TestSequenceSetDefaults(t *testing.T) {
initial: Sequence{
ObjectMeta: metav1.ObjectMeta{Namespace: testNS},
Spec: SequenceSpec{
Steps: []duckv1.Destination{
{Ref: &duckv1.KReference{Name: "first"}},
{Ref: &duckv1.KReference{Name: "second"}},
Steps: []SequenceStep{
{Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Name: "first"}}},
{Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Name: "second"}}},
},
Reply: &duckv1.Destination{
Ref: &duckv1.KReference{Name: "reply"},
Expand All @@ -81,9 +81,9 @@ func TestSequenceSetDefaults(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: testNS},
Spec: SequenceSpec{
ChannelTemplate: defaultChannelTemplate,
Steps: []duckv1.Destination{
{Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}},
{Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}},
Steps: []SequenceStep{
{Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}},
{Subscriber: duckv1.Destination{Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}},
},
Reply: &duckv1.Destination{
Ref: &duckv1.KReference{Namespace: testNS, Name: "reply"},
Expand Down
14 changes: 13 additions & 1 deletion pkg/apis/flows/v1alpha1/sequence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -66,7 +67,7 @@ var (
type SequenceSpec struct {
// Steps is the list of Destinations (processors / functions) that will be called in the order
// provided.
Steps []duckv1.Destination `json:"steps"`
Steps []SequenceStep `json:"steps"`

// ChannelTemplate specifies which Channel CRD to use. If left unspecified, it is set to the default Channel CRD
// for the namespace (or cluster, in case there are no defaults for the namespace).
Expand All @@ -78,6 +79,17 @@ type SequenceSpec struct {
Reply *duckv1.Destination `json:"reply,omitempty"`
}

type SequenceStep struct {
// Subscriber receiving the step event
Subscriber duckv1.Destination `json:",inline"`

// Delivery is the delivery specification for events to the subscriber
// This includes things like retries, DLQ, etc.
// Needed for Roundtripping v1alpha1 <-> v1beta1.
// +optional
Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"`
}

type SequenceChannelStatus struct {
// Channel is the reference to the underlying channel.
Channel corev1.ObjectReference `json:"channel"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/apis/flows/v1alpha1/sequence_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,9 @@ func (ps *SequenceSpec) Validate(ctx context.Context) *apis.FieldError {

return errs
}

func (ss *SequenceStep) Validate(ctx context.Context) *apis.FieldError {
errs := ss.Subscriber.Validate(ctx)

return errs
}
14 changes: 7 additions & 7 deletions pkg/apis/flows/v1alpha1/sequence_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestSequenceSpecValidation(t *testing.T) {
}, {
name: "missing channeltemplatespec",
ts: &SequenceSpec{
Steps: []duckv1.Destination{{URI: subscriberURI}},
Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channelTemplate")
Expand All @@ -108,7 +108,7 @@ func TestSequenceSpecValidation(t *testing.T) {
name: "invalid channeltemplatespec missing APIVersion",
ts: &SequenceSpec{
ChannelTemplate: &eventingduck.ChannelTemplateSpec{TypeMeta: metav1.TypeMeta{Kind: "mykind"}, Spec: &runtime.RawExtension{}},
Steps: []duckv1.Destination{{URI: subscriberURI}},
Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channelTemplate.apiVersion")
Expand All @@ -118,7 +118,7 @@ func TestSequenceSpecValidation(t *testing.T) {
name: "invalid channeltemplatespec missing Kind",
ts: &SequenceSpec{
ChannelTemplate: &eventingduck.ChannelTemplateSpec{TypeMeta: metav1.TypeMeta{APIVersion: "myapiversion"}, Spec: &runtime.RawExtension{}},
Steps: []duckv1.Destination{{URI: subscriberURI}},
Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}},
},
want: func() *apis.FieldError {
fe := apis.ErrMissingField("channelTemplate.kind")
Expand All @@ -128,7 +128,7 @@ func TestSequenceSpecValidation(t *testing.T) {
name: "valid sequence",
ts: &SequenceSpec{
ChannelTemplate: validChannelTemplate,
Steps: []duckv1.Destination{{URI: subscriberURI}},
Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}},
},
want: func() *apis.FieldError {
return nil
Expand All @@ -137,7 +137,7 @@ func TestSequenceSpecValidation(t *testing.T) {
name: "valid sequence with valid reply",
ts: &SequenceSpec{
ChannelTemplate: validChannelTemplate,
Steps: []duckv1.Destination{{URI: subscriberURI}},
Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}},
Reply: makeValidReply("reply-channel"),
},
want: func() *apis.FieldError {
Expand All @@ -147,7 +147,7 @@ func TestSequenceSpecValidation(t *testing.T) {
name: "valid sequence with invalid missing name",
ts: &SequenceSpec{
ChannelTemplate: validChannelTemplate,
Steps: []duckv1.Destination{{URI: subscriberURI}},
Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}},
Reply: &duckv1.Destination{
Ref: &duckv1.KReference{
Namespace: "namespace",
Expand All @@ -164,7 +164,7 @@ func TestSequenceSpecValidation(t *testing.T) {
name: "valid sequence with invalid reply",
ts: &SequenceSpec{
ChannelTemplate: validChannelTemplate,
Steps: []duckv1.Destination{{URI: subscriberURI}},
Steps: []SequenceStep{{Subscriber: duckv1.Destination{URI: subscriberURI}}},
Reply: makeInvalidReply("reply-channel"),
},
want: func() *apis.FieldError {
Expand Down
25 changes: 24 additions & 1 deletion pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/apis/flows/v1beta1/sequence_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ func (ss *SequenceSpec) SetDefaults(ctx context.Context) {
ss.Reply.SetDefaults(ctx)
}
}

func (ss *SequenceStep) SetDefaults(ctx context.Context) {
ss.Subscriber.SetDefaults(ctx)

// No delivery defaults.
}
16 changes: 10 additions & 6 deletions pkg/apis/flows/v1beta1/sequence_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@ func TestSequenceSetDefaults(t *testing.T) {
initial: Sequence{
ObjectMeta: metav1.ObjectMeta{Namespace: testNS},
Spec: SequenceSpec{
Steps: []duckv1.Destination{
{Ref: &duckv1.KReference{Name: "first"}},
{Ref: &duckv1.KReference{Name: "second"}},
Steps: []SequenceStep{
{Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{Name: "first"}}},
{Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{Name: "second"}}},
},
Reply: &duckv1.Destination{
Ref: &duckv1.KReference{Name: "reply"},
Expand All @@ -81,9 +83,11 @@ func TestSequenceSetDefaults(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: testNS},
Spec: SequenceSpec{
ChannelTemplate: defaultChannelTemplate,
Steps: []duckv1.Destination{
{Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}},
{Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}},
Steps: []SequenceStep{
{Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{Namespace: testNS, Name: "first"}}},
{Subscriber: duckv1.Destination{
Ref: &duckv1.KReference{Namespace: testNS, Name: "second"}}},
},
Reply: &duckv1.Destination{
Ref: &duckv1.KReference{Namespace: testNS, Name: "reply"},
Expand Down
15 changes: 13 additions & 2 deletions pkg/apis/flows/v1beta1/sequence_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -64,8 +65,8 @@ var (

type SequenceSpec struct {
// Steps is the list of Destinations (processors / functions) that will be called in the order
// provided.
Steps []duckv1.Destination `json:"steps"`
// provided. Each step has its own delivery options
Steps []SequenceStep `json:"steps"`

// ChannelTemplate specifies which Channel CRD to use. If left unspecified, it is set to the default Channel CRD
// for the namespace (or cluster, in case there are no defaults for the namespace).
Expand All @@ -77,6 +78,16 @@ type SequenceSpec struct {
Reply *duckv1.Destination `json:"reply,omitempty"`
}

type SequenceStep struct {
// Subscriber receiving the step event
Subscriber duckv1.Destination `json:",inline"`

// Delivery is the delivery specification for events to the subscriber
// This includes things like retries, DLQ, etc.
// +optional
Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"`
Comment thread
lionelvillard marked this conversation as resolved.
}

type SequenceChannelStatus struct {
// Channel is the reference to the underlying channel.
Channel corev1.ObjectReference `json:"channel"`
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/flows/v1beta1/sequence_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,15 @@ func (ps *SequenceSpec) Validate(ctx context.Context) *apis.FieldError {

return errs
}

func (ss *SequenceStep) Validate(ctx context.Context) *apis.FieldError {
errs := ss.Subscriber.Validate(ctx)

if ss.Delivery != nil {
if de := ss.Delivery.Validate(ctx); de != nil {
errs = errs.Also(de.ViaField("delivery"))
}
}

return errs
}
25 changes: 24 additions & 1 deletion pkg/apis/flows/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/reconciler/sequence/resources/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ func NewSubscription(stepNumber int, s *v1alpha1.Sequence) *messagingv1alpha1.Su
Name: SequenceChannelName(s.Name, stepNumber),
},
Subscriber: &duckv1.Destination{
Ref: s.Spec.Steps[stepNumber].Ref,
URI: s.Spec.Steps[stepNumber].URI,
Ref: s.Spec.Steps[stepNumber].Subscriber.Ref,
URI: s.Spec.Steps[stepNumber].Subscriber.URI,
},
},
}
Expand Down
Loading