diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 506dea4c9c3..1f5a26d987a 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/logconfig" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" @@ -46,6 +45,8 @@ import ( "knative.dev/eventing/pkg/apis/eventing" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + "knative.dev/eventing/pkg/apis/flows" + flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" flowsv1beta1 "knative.dev/eventing/pkg/apis/flows/v1beta1" "knative.dev/eventing/pkg/apis/messaging" channeldefaultconfig "knative.dev/eventing/pkg/apis/messaging/config" @@ -55,6 +56,7 @@ import ( sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" sourcesv1alpha2 "knative.dev/eventing/pkg/apis/sources/v1alpha2" "knative.dev/eventing/pkg/leaderelection" + "knative.dev/eventing/pkg/logconfig" "knative.dev/eventing/pkg/reconciler/sinkbinding" ) @@ -93,6 +95,9 @@ var ourTypes = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{ // v1beta1 flowsv1beta1.SchemeGroupVersion.WithKind("Parallel"): &flowsv1beta1.Parallel{}, flowsv1beta1.SchemeGroupVersion.WithKind("Sequence"): &flowsv1beta1.Sequence{}, + // v1 + flowsv1.SchemeGroupVersion.WithKind("Parallel"): &flowsv1.Parallel{}, + flowsv1.SchemeGroupVersion.WithKind("Sequence"): &flowsv1.Sequence{}, // For group configs.knative.dev configsv1alpha1.SchemeGroupVersion.WithKind("ConfigMapPropagation"): &configsv1alpha1.ConfigMapPropagation{}, @@ -226,6 +231,8 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro eventingv1_ = eventingv1.SchemeGroupVersion.Version messagingv1beta1_ = messagingv1beta1.SchemeGroupVersion.Version messagingv1_ = messagingv1.SchemeGroupVersion.Version + flowsv1beta1_ = flowsv1beta1.SchemeGroupVersion.Version + flowsv1_ = flowsv1.SchemeGroupVersion.Version sourcesv1alpha1_ = sourcesv1alpha1.SchemeGroupVersion.Version sourcesv1alpha2_ = sourcesv1alpha2.SchemeGroupVersion.Version ) @@ -264,6 +271,24 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro }, }, + // flows + flowsv1.Kind("Sequence"): { + DefinitionName: flows.SequenceResource.String(), + HubVersion: flowsv1beta1_, + Zygotes: map[string]conversion.ConvertibleObject{ + flowsv1beta1_: &flowsv1beta1.Sequence{}, + flowsv1_: &flowsv1.Sequence{}, + }, + }, + flowsv1.Kind("Parallel"): { + DefinitionName: flows.ParallelResource.String(), + HubVersion: flowsv1beta1_, + Zygotes: map[string]conversion.ConvertibleObject{ + flowsv1beta1_: &flowsv1beta1.Parallel{}, + flowsv1_: &flowsv1.Parallel{}, + }, + }, + // Sources sourcesv1alpha2.Kind("ApiServerSource"): { DefinitionName: sources.ApiServerSourceResource.String(), diff --git a/pkg/apis/duck/v1beta1/delivery_conversion.go b/pkg/apis/duck/v1beta1/delivery_conversion.go index 8601032c943..12ddd79ef40 100644 --- a/pkg/apis/duck/v1beta1/delivery_conversion.go +++ b/pkg/apis/duck/v1beta1/delivery_conversion.go @@ -31,12 +31,14 @@ func (source *DeliverySpec) ConvertTo(ctx context.Context, to apis.Convertible) case *eventingduckv1.DeliverySpec: sink.Retry = source.Retry sink.BackoffDelay = source.BackoffDelay - if *source.BackoffPolicy == BackoffPolicyLinear { - linear := eventingduckv1.BackoffPolicyLinear - sink.BackoffPolicy = &linear - } else if *source.BackoffPolicy == BackoffPolicyExponential { - exponential := eventingduckv1.BackoffPolicyExponential - sink.BackoffPolicy = &exponential + if source.BackoffPolicy != nil { + if *source.BackoffPolicy == BackoffPolicyLinear { + linear := eventingduckv1.BackoffPolicyLinear + sink.BackoffPolicy = &linear + } else if *source.BackoffPolicy == BackoffPolicyExponential { + exponential := eventingduckv1.BackoffPolicyExponential + sink.BackoffPolicy = &exponential + } } sink.DeadLetterSink = source.DeadLetterSink return nil @@ -51,12 +53,14 @@ func (sink *DeliverySpec) ConvertFrom(ctx context.Context, from apis.Convertible case *eventingduckv1.DeliverySpec: sink.Retry = source.Retry sink.BackoffDelay = source.BackoffDelay - if *source.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { - linear := BackoffPolicyLinear - sink.BackoffPolicy = &linear - } else if *source.BackoffPolicy == eventingduckv1.BackoffPolicyExponential { - exponential := BackoffPolicyExponential - sink.BackoffPolicy = &exponential + if source.BackoffPolicy != nil { + if *source.BackoffPolicy == eventingduckv1.BackoffPolicyLinear { + linear := BackoffPolicyLinear + sink.BackoffPolicy = &linear + } else if *source.BackoffPolicy == eventingduckv1.BackoffPolicyExponential { + exponential := BackoffPolicyExponential + sink.BackoffPolicy = &exponential + } } sink.DeadLetterSink = source.DeadLetterSink return nil diff --git a/pkg/apis/flows/v1beta1/parallel_conversion.go b/pkg/apis/flows/v1beta1/parallel_conversion.go index 4324ba79b2c..327297a2d6b 100644 --- a/pkg/apis/flows/v1beta1/parallel_conversion.go +++ b/pkg/apis/flows/v1beta1/parallel_conversion.go @@ -21,14 +21,138 @@ import ( "fmt" "knative.dev/pkg/apis" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" ) // ConvertTo implements apis.Convertible -func (source *Parallel) ConvertTo(ctx context.Context, sink apis.Convertible) error { - return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink) +// Converts obj from v1beta1.Parallel into v1.Parallel +func (source *Parallel) ConvertTo(ctx context.Context, obj apis.Convertible) error { + switch sink := obj.(type) { + case *v1.Parallel: + sink.ObjectMeta = source.ObjectMeta + + sink.Spec.Branches = make([]v1.ParallelBranch, len(source.Spec.Branches)) + for i, b := range source.Spec.Branches { + sink.Spec.Branches[i] = v1.ParallelBranch{ + Filter: b.Filter, + Subscriber: b.Subscriber, + Reply: b.Reply, + } + + if b.Delivery != nil { + sink.Spec.Branches[i].Delivery = &eventingduckv1.DeliverySpec{} + if err := b.Delivery.ConvertTo(ctx, sink.Spec.Branches[i].Delivery); err != nil { + return err + } + } + } + + if source.Spec.ChannelTemplate != nil { + sink.Spec.ChannelTemplate = &messagingv1.ChannelTemplateSpec{ + TypeMeta: source.Spec.ChannelTemplate.TypeMeta, + Spec: source.Spec.ChannelTemplate.Spec, + } + } + sink.Spec.Reply = source.Spec.Reply + + sink.Status.Status = source.Status.Status + sink.Status.AddressStatus = source.Status.AddressStatus + + sink.Status.IngressChannelStatus = v1.ParallelChannelStatus{ + Channel: source.Status.IngressChannelStatus.Channel, + ReadyCondition: source.Status.IngressChannelStatus.ReadyCondition, + } + + if source.Status.BranchStatuses != nil { + sink.Status.BranchStatuses = make([]v1.ParallelBranchStatus, len(source.Status.BranchStatuses)) + for i, b := range source.Status.BranchStatuses { + sink.Status.BranchStatuses[i] = v1.ParallelBranchStatus{ + FilterSubscriptionStatus: v1.ParallelSubscriptionStatus{ + Subscription: b.FilterSubscriptionStatus.Subscription, + ReadyCondition: b.FilterSubscriptionStatus.ReadyCondition, + }, + FilterChannelStatus: v1.ParallelChannelStatus{ + Channel: b.FilterChannelStatus.Channel, + ReadyCondition: b.FilterChannelStatus.ReadyCondition, + }, + SubscriptionStatus: v1.ParallelSubscriptionStatus{ + Subscription: b.SubscriptionStatus.Subscription, + ReadyCondition: b.SubscriptionStatus.ReadyCondition, + }, + } + } + } + + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", sink) + } } // ConvertFrom implements apis.Convertible -func (sink *Parallel) ConvertFrom(ctx context.Context, source apis.Convertible) error { - return fmt.Errorf("v1beta1 is the highest known version, got: %T", source) +// Converts obj from v1.Parallel into v1beta1.Parallel +func (sink *Parallel) ConvertFrom(ctx context.Context, obj apis.Convertible) error { + switch source := obj.(type) { + case *v1.Parallel: + sink.ObjectMeta = source.ObjectMeta + + sink.Spec.Branches = make([]ParallelBranch, len(source.Spec.Branches)) + for i, b := range source.Spec.Branches { + sink.Spec.Branches[i] = ParallelBranch{ + Filter: b.Filter, + Subscriber: b.Subscriber, + Reply: b.Reply, + } + if b.Delivery != nil { + sink.Spec.Branches[i].Delivery = &eventingduckv1beta1.DeliverySpec{} + if err := sink.Spec.Branches[i].Delivery.ConvertFrom(ctx, b.Delivery); err != nil { + return err + } + } + } + if source.Spec.ChannelTemplate != nil { + sink.Spec.ChannelTemplate = &messagingv1beta1.ChannelTemplateSpec{ + TypeMeta: source.Spec.ChannelTemplate.TypeMeta, + Spec: source.Spec.ChannelTemplate.Spec, + } + } + sink.Spec.Reply = source.Spec.Reply + + sink.Status.Status = source.Status.Status + sink.Status.AddressStatus = source.Status.AddressStatus + + sink.Status.IngressChannelStatus = ParallelChannelStatus{ + Channel: source.Status.IngressChannelStatus.Channel, + ReadyCondition: source.Status.IngressChannelStatus.ReadyCondition, + } + + if source.Status.BranchStatuses != nil { + sink.Status.BranchStatuses = make([]ParallelBranchStatus, len(source.Status.BranchStatuses)) + for i, b := range source.Status.BranchStatuses { + sink.Status.BranchStatuses[i] = ParallelBranchStatus{ + FilterSubscriptionStatus: ParallelSubscriptionStatus{ + Subscription: b.FilterSubscriptionStatus.Subscription, + ReadyCondition: b.FilterSubscriptionStatus.ReadyCondition, + }, + FilterChannelStatus: ParallelChannelStatus{ + Channel: b.FilterChannelStatus.Channel, + ReadyCondition: b.FilterChannelStatus.ReadyCondition, + }, + SubscriptionStatus: ParallelSubscriptionStatus{ + Subscription: b.SubscriptionStatus.Subscription, + ReadyCondition: b.SubscriptionStatus.ReadyCondition, + }, + } + } + } + + return nil + default: + return fmt.Errorf("unknown version, got: %T", source) + } } diff --git a/pkg/apis/flows/v1beta1/parallel_conversion_test.go b/pkg/apis/flows/v1beta1/parallel_conversion_test.go index 5063422aaed..88a42eba906 100644 --- a/pkg/apis/flows/v1beta1/parallel_conversion_test.go +++ b/pkg/apis/flows/v1beta1/parallel_conversion_test.go @@ -19,10 +19,24 @@ package v1beta1 import ( "context" "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + "knative.dev/pkg/apis" + + duckv1 "knative.dev/pkg/apis/duck/v1" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" ) func TestParallelConversionBadType(t *testing.T) { - good, bad := &Parallel{}, &Parallel{} + good, bad := &Parallel{}, &Sequence{} if err := good.ConvertTo(context.Background(), bad); err == nil { t.Errorf("ConvertTo() = %#v, wanted error", bad) @@ -32,3 +46,421 @@ func TestParallelConversionBadType(t *testing.T) { t.Errorf("ConvertFrom() = %#v, wanted error", good) } } + +// Test v1beta1 -> v1 -> v1beta1 +func TestParallelRoundTripV1beta1(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&v1.Parallel{}} + linear := eventingduckv1beta1.BackoffPolicyLinear + + tests := []struct { + name string + in *Parallel + }{{ + name: "min configuration", + in: &Parallel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "par-name", + Namespace: "par-ns", + Generation: 17, + }, + Spec: ParallelSpec{ + Branches: []ParallelBranch{}, + }, + }, + }, { + name: "full configuration", + in: &Parallel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "par-name", + Namespace: "par-ns", + Generation: 17, + }, + Spec: ParallelSpec{ + Branches: []ParallelBranch{ + { + Filter: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "f1Kind", + Namespace: "f1Namespace", + Name: "f1Name", + APIVersion: "f1APIVersion", + }, + URI: apis.HTTP("f1.example.com")}, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s1Kind", + Namespace: "s1Namespace", + Name: "s1Name", + APIVersion: "s1APIVersion", + }, + URI: apis.HTTP("s1.example.com")}, + + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "reply1Kind", + Namespace: "reply1Namespace", + Name: "reply1Name", + APIVersion: "reply1APIVersion", + }, + URI: apis.HTTP("reply1.example.com"), + }, + Delivery: &eventingduckv1beta1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "d1Kind", + Namespace: "d1Namespace", + Name: "d1Name", + APIVersion: "d1APIVersion", + }, + URI: apis.HTTP("d1.example.com")}, + Retry: pointer.Int32Ptr(1), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("1m"), + }, + }, + { + Filter: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "f2Kind", + Namespace: "f2Namespace", + Name: "f2Name", + APIVersion: "f2APIVersion", + }, + URI: apis.HTTP("f2.example.com")}, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s2Kind", + Namespace: "s2Namespace", + Name: "s2Name", + APIVersion: "s2APIVersion", + }, + URI: apis.HTTP("s2.example.com")}, + + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "reply2Kind", + Namespace: "reply2Namespace", + Name: "reply2Name", + APIVersion: "reply2APIVersion", + }, + URI: apis.HTTP("reply2.example.com"), + }, + Delivery: &eventingduckv1beta1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "d2Kind", + Namespace: "d2Namespace", + Name: "d2Name", + APIVersion: "d2APIVersion", + }, + URI: apis.HTTP("d2.example.com")}, + Retry: pointer.Int32Ptr(1), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("1m"), + }, + }, + }, + ChannelTemplate: &messagingv1beta1.ChannelTemplateSpec{ + TypeMeta: metav1.TypeMeta{ + Kind: "channelKind", + APIVersion: "channelAPIVersion", + }, + }, + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "replyKind", + Namespace: "replyNamespace", + Name: "replyName", + APIVersion: "replyAPIVersion", + }, + URI: apis.HTTP("reply.example.com"), + }, + }, + Status: ParallelStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + }, + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTP("addressstatus.example.com"), + }, + }, + IngressChannelStatus: ParallelChannelStatus{ + Channel: corev1.ObjectReference{ + Kind: "i-channel-kind", + APIVersion: "i-channel-apiversion", + Name: "i-channel-name", + Namespace: "i-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "i1-msg"}, + }, + BranchStatuses: []ParallelBranchStatus{ + { + FilterSubscriptionStatus: ParallelSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + Kind: "f1-sub-kind", + APIVersion: "f1-sub-apiversion", + Name: "f1-sub-name", + Namespace: "f1-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "f1-msg"}, + }, + SubscriptionStatus: ParallelSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + Kind: "s1-sub-kind", + APIVersion: "s1-sub-apiversion", + Name: "s1-sub-name", + Namespace: "s1-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "s1-msg"}, + }, + FilterChannelStatus: ParallelChannelStatus{ + Channel: corev1.ObjectReference{ + Kind: "s1-channel-kind", + APIVersion: "s1-channel-apiversion", + Name: "s1-channel-name", + Namespace: "s1-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "c1-msg"}, + }, + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := test.in.ConvertTo(context.Background(), ver); err != nil { + t.Errorf("ConvertTo() = %v", err) + } + got := &Parallel{} + if err := got.ConvertFrom(context.Background(), ver); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + + if diff := cmp.Diff(test.in, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} + +// Test v1 -> v1beta1 -> v1 +func TestParallelRoundTripV1(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&Parallel{}} + linear := eventingduckv1.BackoffPolicyLinear + + tests := []struct { + name string + in *v1.Parallel + }{{ + name: "min configuration", + in: &v1.Parallel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "par-name", + Namespace: "par-ns", + Generation: 17, + }, + Spec: v1.ParallelSpec{ + Branches: []v1.ParallelBranch{}, + }, + }, + }, { + name: "full configuration", + in: &v1.Parallel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "par-name", + Namespace: "par-ns", + Generation: 17, + }, + Spec: v1.ParallelSpec{ + Branches: []v1.ParallelBranch{ + { + Filter: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "f1Kind", + Namespace: "f1Namespace", + Name: "f1Name", + APIVersion: "f1APIVersion", + }, + URI: apis.HTTP("f1.example.com")}, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s1Kind", + Namespace: "s1Namespace", + Name: "s1Name", + APIVersion: "s1APIVersion", + }, + URI: apis.HTTP("s1.example.com")}, + + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "reply1Kind", + Namespace: "reply1Namespace", + Name: "reply1Name", + APIVersion: "reply1APIVersion", + }, + URI: apis.HTTP("reply1.example.com"), + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "d1Kind", + Namespace: "d1Namespace", + Name: "d1Name", + APIVersion: "d1APIVersion", + }, + URI: apis.HTTP("d1.example.com")}, + Retry: pointer.Int32Ptr(1), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("1m"), + }, + }, + { + Filter: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "f2Kind", + Namespace: "f2Namespace", + Name: "f2Name", + APIVersion: "f2APIVersion", + }, + URI: apis.HTTP("f2.example.com")}, + + Subscriber: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s2Kind", + Namespace: "s2Namespace", + Name: "s2Name", + APIVersion: "s2APIVersion", + }, + URI: apis.HTTP("s2.example.com")}, + + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "reply2Kind", + Namespace: "reply2Namespace", + Name: "reply2Name", + APIVersion: "reply2APIVersion", + }, + URI: apis.HTTP("reply2.example.com"), + }, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "d2Kind", + Namespace: "d2Namespace", + Name: "d2Name", + APIVersion: "d2APIVersion", + }, + URI: apis.HTTP("d2.example.com")}, + Retry: pointer.Int32Ptr(1), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("1m"), + }, + }, + }, + ChannelTemplate: &messagingv1.ChannelTemplateSpec{ + TypeMeta: metav1.TypeMeta{ + Kind: "channelKind", + APIVersion: "channelAPIVersion", + }, + }, + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "replyKind", + Namespace: "replyNamespace", + Name: "replyName", + APIVersion: "replyAPIVersion", + }, + URI: apis.HTTP("reply.example.com"), + }, + }, + Status: v1.ParallelStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + }, + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTP("addressstatus.example.com"), + }, + }, + IngressChannelStatus: v1.ParallelChannelStatus{ + Channel: corev1.ObjectReference{ + Kind: "i-channel-kind", + APIVersion: "i-channel-apiversion", + Name: "i-channel-name", + Namespace: "i-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "i1-msg"}, + }, + BranchStatuses: []v1.ParallelBranchStatus{ + { + FilterSubscriptionStatus: v1.ParallelSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + Kind: "f1-sub-kind", + APIVersion: "f1-sub-apiversion", + Name: "f1-sub-name", + Namespace: "f1-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "f1-msg"}, + }, + SubscriptionStatus: v1.ParallelSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + Kind: "s1-sub-kind", + APIVersion: "s1-sub-apiversion", + Name: "s1-sub-name", + Namespace: "s1-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "s1-msg"}, + }, + FilterChannelStatus: v1.ParallelChannelStatus{ + Channel: corev1.ObjectReference{ + Kind: "s1-channel-kind", + APIVersion: "s1-channel-apiversion", + Name: "s1-channel-name", + Namespace: "s1-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "c1-msg"}, + }, + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := ver.ConvertFrom(context.Background(), test.in); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + got := &v1.Parallel{} + if err := ver.ConvertTo(context.Background(), got); err != nil { + t.Errorf("ConvertTo() = %v", err) + } + + if diff := cmp.Diff(test.in, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} diff --git a/pkg/apis/flows/v1beta1/sequence_conversion.go b/pkg/apis/flows/v1beta1/sequence_conversion.go index 8e9aad9a515..e79da8c3cb3 100644 --- a/pkg/apis/flows/v1beta1/sequence_conversion.go +++ b/pkg/apis/flows/v1beta1/sequence_conversion.go @@ -21,14 +21,125 @@ import ( "fmt" "knative.dev/pkg/apis" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" ) // ConvertTo implements apis.Convertible -func (source *Sequence) ConvertTo(ctx context.Context, sink apis.Convertible) error { - return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink) +// Converts obj from v1beta1.Sequence into v1.Sequence +func (source *Sequence) ConvertTo(ctx context.Context, obj apis.Convertible) error { + switch sink := obj.(type) { + case *v1.Sequence: + sink.ObjectMeta = source.ObjectMeta + + sink.Spec.Steps = make([]v1.SequenceStep, len(source.Spec.Steps)) + for i, s := range source.Spec.Steps { + sink.Spec.Steps[i] = v1.SequenceStep{ + Destination: s.Destination, + } + + if s.Delivery != nil { + sink.Spec.Steps[i].Delivery = &eventingduckv1.DeliverySpec{} + if err := s.Delivery.ConvertTo(ctx, sink.Spec.Steps[i].Delivery); err != nil { + return err + } + } + } + + if source.Spec.ChannelTemplate != nil { + sink.Spec.ChannelTemplate = &messagingv1.ChannelTemplateSpec{ + TypeMeta: source.Spec.ChannelTemplate.TypeMeta, + Spec: source.Spec.ChannelTemplate.Spec, + } + } + sink.Spec.Reply = source.Spec.Reply + + sink.Status.Status = source.Status.Status + sink.Status.AddressStatus = source.Status.AddressStatus + + if source.Status.SubscriptionStatuses != nil { + sink.Status.SubscriptionStatuses = make([]v1.SequenceSubscriptionStatus, len(source.Status.SubscriptionStatuses)) + for i, s := range source.Status.SubscriptionStatuses { + sink.Status.SubscriptionStatuses[i] = v1.SequenceSubscriptionStatus{ + Subscription: s.Subscription, + ReadyCondition: s.ReadyCondition, + } + } + } + + if source.Status.ChannelStatuses != nil { + sink.Status.ChannelStatuses = make([]v1.SequenceChannelStatus, len(source.Status.ChannelStatuses)) + for i, s := range source.Status.ChannelStatuses { + sink.Status.ChannelStatuses[i] = v1.SequenceChannelStatus{ + Channel: s.Channel, + ReadyCondition: s.ReadyCondition, + } + } + } + + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", sink) + } } // ConvertFrom implements apis.Convertible -func (sink *Sequence) ConvertFrom(ctx context.Context, source apis.Convertible) error { - return fmt.Errorf("v1beta1 is the highest known version, got: %T", source) +func (sink *Sequence) ConvertFrom(ctx context.Context, obj apis.Convertible) error { + switch source := obj.(type) { + case *v1.Sequence: + sink.ObjectMeta = source.ObjectMeta + + sink.Spec.Steps = make([]SequenceStep, len(source.Spec.Steps)) + for i, s := range source.Spec.Steps { + sink.Spec.Steps[i] = SequenceStep{ + Destination: s.Destination, + } + if s.Delivery != nil { + sink.Spec.Steps[i].Delivery = &eventingduckv1beta1.DeliverySpec{} + if err := sink.Spec.Steps[i].Delivery.ConvertFrom(ctx, s.Delivery); err != nil { + return err + } + } + } + + if source.Spec.ChannelTemplate != nil { + sink.Spec.ChannelTemplate = &messagingv1beta1.ChannelTemplateSpec{ + TypeMeta: source.Spec.ChannelTemplate.TypeMeta, + Spec: source.Spec.ChannelTemplate.Spec, + } + } + + sink.Spec.Reply = source.Spec.Reply + + sink.Status.Status = source.Status.Status + sink.Status.AddressStatus = source.Status.AddressStatus + + if source.Status.SubscriptionStatuses != nil { + sink.Status.SubscriptionStatuses = make([]SequenceSubscriptionStatus, len(source.Status.SubscriptionStatuses)) + for i, s := range source.Status.SubscriptionStatuses { + sink.Status.SubscriptionStatuses[i] = SequenceSubscriptionStatus{ + Subscription: s.Subscription, + ReadyCondition: s.ReadyCondition, + } + } + } + + if source.Status.ChannelStatuses != nil { + sink.Status.ChannelStatuses = make([]SequenceChannelStatus, len(source.Status.ChannelStatuses)) + for i, s := range source.Status.ChannelStatuses { + sink.Status.ChannelStatuses[i] = SequenceChannelStatus{ + Channel: s.Channel, + ReadyCondition: s.ReadyCondition, + } + } + } + + return nil + default: + return fmt.Errorf("unknown version, got: %T", source) + } } diff --git a/pkg/apis/flows/v1beta1/sequence_conversion_test.go b/pkg/apis/flows/v1beta1/sequence_conversion_test.go index 1e600bc4180..1d6a0c12e80 100644 --- a/pkg/apis/flows/v1beta1/sequence_conversion_test.go +++ b/pkg/apis/flows/v1beta1/sequence_conversion_test.go @@ -19,10 +19,23 @@ package v1beta1 import ( "context" "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + "knative.dev/pkg/apis" + + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + v1 "knative.dev/eventing/pkg/apis/flows/v1" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" + duckv1 "knative.dev/pkg/apis/duck/v1" ) func TestSequenceConversionBadType(t *testing.T) { - good, bad := &Sequence{}, &Sequence{} + good, bad := &Sequence{}, &Parallel{} if err := good.ConvertTo(context.Background(), bad); err == nil { t.Errorf("ConvertTo() = %#v, wanted error", bad) @@ -32,3 +45,348 @@ func TestSequenceConversionBadType(t *testing.T) { t.Errorf("ConvertFrom() = %#v, wanted error", good) } } + +// Test v1beta1 -> v1 -> v1beta1 +func TestSequenceRoundTripV1beta1(t *testing.T) { + versions := []apis.Convertible{&v1.Sequence{}} + linear := eventingduckv1beta1.BackoffPolicyLinear + + tests := []struct { + name string + in *Sequence + }{{ + name: "min configuration", + in: &Sequence{ + ObjectMeta: metav1.ObjectMeta{ + Name: "seq-name", + Namespace: "seq-ns", + Generation: 17, + }, + Spec: SequenceSpec{ + Steps: []SequenceStep{}, + }, + }, + }, { + name: "full configuration", + in: &Sequence{ + ObjectMeta: metav1.ObjectMeta{ + Name: "seq-name", + Namespace: "seq-ns", + Generation: 17, + }, + Spec: SequenceSpec{ + Steps: []SequenceStep{ + { + Destination: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s1Kind", + Namespace: "s1Namespace", + Name: "s1Name", + APIVersion: "s1APIVersion", + }, + URI: apis.HTTP("s1.example.com")}, + Delivery: &eventingduckv1beta1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "dl1Kind", + Namespace: "dl1Namespace", + Name: "dl1Name", + APIVersion: "dl1APIVersion", + }, + URI: apis.HTTP("subscriber.dls1.example.com"), + }, + Retry: pointer.Int32Ptr(5), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("5s"), + }, + }, + { + Destination: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s2Kind", + Namespace: "s2Namespace", + Name: "s2Name", + APIVersion: "s2APIVersion", + }, + URI: apis.HTTP("s2.example.com")}, + Delivery: &eventingduckv1beta1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "dl2Kind", + Namespace: "dl2Namespace", + Name: "dl2Name", + APIVersion: "dl2APIVersion", + }, + URI: apis.HTTP("subscriber.dls2.example.com"), + }, + Retry: pointer.Int32Ptr(7), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("8s"), + }, + }, + }, + ChannelTemplate: &messagingv1beta1.ChannelTemplateSpec{ + TypeMeta: metav1.TypeMeta{ + Kind: "channelKind", + APIVersion: "channelAPIVersion", + }, + }, + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "replyKind", + Namespace: "replyNamespace", + Name: "replyName", + APIVersion: "replyAPIVersion", + }, + URI: apis.HTTP("reply.example.com"), + }, + }, + Status: SequenceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + }, + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTP("addressstatus.example.com"), + }, + }, + SubscriptionStatuses: []SequenceSubscriptionStatus{ + { + Subscription: corev1.ObjectReference{ + Kind: "s1-sub-kind", + APIVersion: "s1-sub-apiversion", + Name: "s1-sub-name", + Namespace: "s1-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "s1-msg"}, + }, + { + Subscription: corev1.ObjectReference{ + Kind: "s2-sub-kind", + APIVersion: "s2-sub-apiversion", + Name: "s2-sub-name", + Namespace: "s2-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "s2-msg"}, + }, + }, + ChannelStatuses: []SequenceChannelStatus{ + { + Channel: corev1.ObjectReference{ + Kind: "s1-channel-kind", + APIVersion: "s1-channel-apiversion", + Name: "s1-channel-name", + Namespace: "s1-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "s1-msg"}, + }, + { + Channel: corev1.ObjectReference{ + Kind: "s2-channel-kind", + APIVersion: "s2-channel-apiversion", + Name: "s2-channel-name", + Namespace: "s2-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "s2-msg"}, + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := test.in.ConvertTo(context.Background(), ver); err != nil { + t.Errorf("ConvertTo() = %v", err) + } + got := &Sequence{} + if err := got.ConvertFrom(context.Background(), ver); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + + if diff := cmp.Diff(test.in, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} + +// Test v1 -> v1beta1 -> v1 +func TestSequenceRoundTripV1(t *testing.T) { + versions := []apis.Convertible{&Sequence{}} + + linear := eventingduckv1.BackoffPolicyLinear + + tests := []struct { + name string + in *v1.Sequence + }{{ + name: "min configuration", + in: &v1.Sequence{ + ObjectMeta: metav1.ObjectMeta{ + Name: "seq-name", + Namespace: "seq-ns", + Generation: 17, + }, + Spec: v1.SequenceSpec{ + Steps: []v1.SequenceStep{}, + }, + }, + }, { + name: "full configuration", + in: &v1.Sequence{ + ObjectMeta: metav1.ObjectMeta{ + Name: "seq-name", + Namespace: "seq-ns", + Generation: 17, + }, + Spec: v1.SequenceSpec{ + Steps: []v1.SequenceStep{ + { + Destination: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s1Kind", + Namespace: "s1Namespace", + Name: "s1Name", + APIVersion: "s1APIVersion", + }, + URI: apis.HTTP("s1.example.com")}, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "dl1Kind", + Namespace: "dl1Namespace", + Name: "dl1Name", + APIVersion: "dl1APIVersion", + }, + URI: apis.HTTP("subscriber.dls1.example.com"), + }, + Retry: pointer.Int32Ptr(5), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("5s"), + }, + }, + { + Destination: duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "s2Kind", + Namespace: "s2Namespace", + Name: "s2Name", + APIVersion: "s2APIVersion", + }, + URI: apis.HTTP("s2.example.com")}, + Delivery: &eventingduckv1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "dl2Kind", + Namespace: "dl2Namespace", + Name: "dl2Name", + APIVersion: "dl2APIVersion", + }, + URI: apis.HTTP("subscriber.dls2.example.com"), + }, + Retry: pointer.Int32Ptr(7), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("8s"), + }, + }, + }, + ChannelTemplate: &messagingv1.ChannelTemplateSpec{ + TypeMeta: metav1.TypeMeta{ + Kind: "channelKind", + APIVersion: "channelAPIVersion", + }, + }, + Reply: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "replyKind", + Namespace: "replyNamespace", + Name: "replyName", + APIVersion: "replyAPIVersion", + }, + URI: apis.HTTP("reply.example.com"), + }, + }, + Status: v1.SequenceStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + }, + AddressStatus: duckv1.AddressStatus{ + Address: &duckv1.Addressable{ + URL: apis.HTTP("addressstatus.example.com"), + }, + }, + SubscriptionStatuses: []v1.SequenceSubscriptionStatus{ + { + Subscription: corev1.ObjectReference{ + Kind: "s1-sub-kind", + APIVersion: "s1-sub-apiversion", + Name: "s1-sub-name", + Namespace: "s1-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "s1-msg"}, + }, + { + Subscription: corev1.ObjectReference{ + Kind: "s2-sub-kind", + APIVersion: "s2-sub-apiversion", + Name: "s2-sub-name", + Namespace: "s2-sub-namespace", + }, + ReadyCondition: apis.Condition{Message: "s2-msg"}, + }, + }, + ChannelStatuses: []v1.SequenceChannelStatus{ + { + Channel: corev1.ObjectReference{ + Kind: "s1-channel-kind", + APIVersion: "s1-channel-apiversion", + Name: "s1-channel-name", + Namespace: "s1-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "s1-msg"}, + }, + { + Channel: corev1.ObjectReference{ + Kind: "s2-channel-kind", + APIVersion: "s2-channel-apiversion", + Name: "s2-channel-name", + Namespace: "s2-channel-namespace", + }, + ReadyCondition: apis.Condition{Message: "s2-msg"}, + }, + }, + }, + }, + }} + + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := ver.ConvertFrom(context.Background(), test.in); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + got := &v1.Sequence{} + if err := ver.ConvertTo(context.Background(), got); err != nil { + t.Errorf("ConvertFrom() = %v", err) + } + + if diff := cmp.Diff(test.in, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +}