diff --git a/cmd/controller/main.go b/cmd/controller/main.go index edd16fe2b72..944d96d0f51 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -35,6 +35,7 @@ import ( "github.com/knative/eventing/pkg/reconciler/channel" "github.com/knative/eventing/pkg/reconciler/eventtype" "github.com/knative/eventing/pkg/reconciler/namespace" + "github.com/knative/eventing/pkg/reconciler/pipeline" "github.com/knative/eventing/pkg/reconciler/subscription" "github.com/knative/eventing/pkg/reconciler/trigger" "github.com/knative/pkg/configmap" @@ -81,7 +82,7 @@ func main() { logger.Info("Starting the controller") - const numControllers = 6 + const numControllers = 7 cfg.QPS = numControllers * rest.DefaultQPS cfg.Burst = numControllers * rest.DefaultBurst opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) @@ -97,6 +98,9 @@ func main() { brokerInformer := eventingInformerFactory.Eventing().V1alpha1().Brokers() eventTypeInformer := eventingInformerFactory.Eventing().V1alpha1().EventTypes() + // Messaging + pipelineInformer := eventingInformerFactory.Messaging().V1alpha1().Pipelines() + // Kube serviceInformer := kubeInformerFactory.Core().V1().Services() namespaceInformer := kubeInformerFactory.Core().V1().Namespaces() @@ -165,6 +169,12 @@ func main() { eventTypeInformer, brokerInformer, ), + pipeline.NewController( + opt, + pipelineInformer, + addressableInformer, + subscriptionInformer, + ), } // This line asserts at compile time that the length of controllers is equal to numControllers. // It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that @@ -191,6 +201,7 @@ func main() { subscriptionInformer.Informer(), triggerInformer.Informer(), eventTypeInformer.Informer(), + pipelineInformer.Informer(), // Kube configMapInformer.Informer(), serviceInformer.Informer(), diff --git a/config/200-controller-clusterrole.yaml b/config/200-controller-clusterrole.yaml index cf734475777..dfecaa942ef 100644 --- a/config/200-controller-clusterrole.yaml +++ b/config/200-controller-clusterrole.yaml @@ -76,6 +76,16 @@ rules: verbs: - "update" + # Our own resources and statuses we care about. + - apiGroups: + - "messaging.knative.dev" + resources: + - "pipelines" + - "pipelines/status" + - "inmemorychannels" + - "inmemorychannels/status" + verbs: *everything + # Source resources and statuses we care about. - apiGroups: - "sources.eventing.knative.dev" diff --git a/config/300-pipeline.yaml b/config/300-pipeline.yaml new file mode 100644 index 00000000000..be2da2656c6 --- /dev/null +++ b/config/300-pipeline.yaml @@ -0,0 +1,96 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: pipelines.messaging.knative.dev + labels: + knative.dev/crd-install: "true" +spec: + group: messaging.knative.dev + version: v1alpha1 + names: + kind: Pipeline + plural: pipelines + singular: pipeline + categories: + - all + - knative + - eventing + - messaging + scope: Namespaced + subresources: + status: {} + additionalPrinterColumns: + - name: Ready + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].status" + - name: Reason + type: string + JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason" + - name: Age + type: date + JSONPath: .metadata.creationTimestamp + validation: + openAPIV3Schema: + properties: + spec: + required: + - steps + - channelTemplate + properties: + steps: + type: array + items: + type: object + properties: + dnsName: + type: string + minLength: 1 + uri: + type: string + minLength: 1 + ref: + type: object + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + namespace: + type: string + maxLength: 0 + channelTemplate: + type: object + required: + - apiVersion + - kind + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + spec: + type: object diff --git a/pkg/apis/eventing/v1alpha1/register.go b/pkg/apis/eventing/v1alpha1/register.go index 977cad56aaf..320b6be84b5 100644 --- a/pkg/apis/eventing/v1alpha1/register.go +++ b/pkg/apis/eventing/v1alpha1/register.go @@ -51,12 +51,12 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ChannelList{}, &ClusterChannelProvisioner{}, &ClusterChannelProvisionerList{}, + &EventType{}, + &EventTypeList{}, &Subscription{}, &SubscriptionList{}, &Trigger{}, &TriggerList{}, - &EventType{}, - &EventTypeList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go b/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go index b4da810c854..03f527b36ba 100644 --- a/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go +++ b/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation.go @@ -31,10 +31,10 @@ func isChannelEmpty(f corev1.ObjectReference) bool { // Valid if it is a valid object reference. func isValidChannel(f corev1.ObjectReference) *apis.FieldError { - return isValidObjectReference(f) + return IsValidObjectReference(f) } -func isValidObjectReference(f corev1.ObjectReference) *apis.FieldError { +func IsValidObjectReference(f corev1.ObjectReference) *apis.FieldError { return checkRequiredObjectReferenceFields(f). Also(checkDisallowedObjectReferenceFields(f)) } diff --git a/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation_test.go b/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation_test.go index 453f6bccec2..79890cf36f9 100644 --- a/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/subscribable_channelable_validation_test.go @@ -141,7 +141,7 @@ func TestIsValidObjectReference(t *testing.T) { for _, fe := range test.want { allWanted = allWanted.Also(fe) } - got := isValidObjectReference(test.ref) + got := IsValidObjectReference(test.ref) if diff := cmp.Diff(allWanted.Error(), got.Error()); diff != "" { t.Errorf("%s: validation (-want, +got) = %v", test.name, diff) } diff --git a/pkg/apis/eventing/v1alpha1/subscription_validation.go b/pkg/apis/eventing/v1alpha1/subscription_validation.go index adb45a01bcd..3e6e9923f3e 100644 --- a/pkg/apis/eventing/v1alpha1/subscription_validation.go +++ b/pkg/apis/eventing/v1alpha1/subscription_validation.go @@ -52,7 +52,7 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError { } if !missingSubscriber { - if fe := isValidSubscriberSpec(*ss.Subscriber); fe != nil { + if fe := IsValidSubscriberSpec(*ss.Subscriber); fe != nil { errs = errs.Also(fe.ViaField("subscriber")) } } @@ -78,7 +78,7 @@ func isSubscriberSpecNilOrEmpty(s *SubscriberSpec) bool { return false } -func isValidSubscriberSpec(s SubscriberSpec) *apis.FieldError { +func IsValidSubscriberSpec(s SubscriberSpec) *apis.FieldError { var errs *apis.FieldError fieldsSet := make([]string, 0, 0) @@ -99,7 +99,7 @@ func isValidSubscriberSpec(s SubscriberSpec) *apis.FieldError { // If Ref given, check the fields. if s.Ref != nil && !equality.Semantic.DeepEqual(s.Ref, &corev1.ObjectReference{}) { - fe := isValidObjectReference(*s.Ref) + fe := IsValidObjectReference(*s.Ref) if fe != nil { errs = errs.Also(fe.ViaField("ref")) } @@ -112,7 +112,7 @@ func isReplyStrategyNilOrEmpty(r *ReplyStrategy) bool { } func isValidReply(r ReplyStrategy) *apis.FieldError { - if fe := isValidObjectReference(*r.Channel); fe != nil { + if fe := IsValidObjectReference(*r.Channel); fe != nil { return fe.ViaField("channel") } return nil diff --git a/pkg/apis/eventing/v1alpha1/subscription_validation_test.go b/pkg/apis/eventing/v1alpha1/subscription_validation_test.go index cf7c27a4ef7..1b5e869eed3 100644 --- a/pkg/apis/eventing/v1alpha1/subscription_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/subscription_validation_test.go @@ -625,7 +625,7 @@ func TestValidgetValidSubscriber(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - got := isValidSubscriberSpec(test.s) + got := IsValidSubscriberSpec(test.s) if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { t.Errorf("%s: isValidSubscriber (-want, +got) = %v", test.name, diff) } diff --git a/pkg/apis/eventing/v1alpha1/trigger_validation.go b/pkg/apis/eventing/v1alpha1/trigger_validation.go index 79aed96f714..dd0b30ca60b 100644 --- a/pkg/apis/eventing/v1alpha1/trigger_validation.go +++ b/pkg/apis/eventing/v1alpha1/trigger_validation.go @@ -47,7 +47,7 @@ func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError { if isSubscriberSpecNilOrEmpty(ts.Subscriber) { fe := apis.ErrMissingField("subscriber") errs = errs.Also(fe) - } else if fe := isValidSubscriberSpec(*ts.Subscriber); fe != nil { + } else if fe := IsValidSubscriberSpec(*ts.Subscriber); fe != nil { errs = errs.Also(fe.ViaField("subscriber")) } diff --git a/pkg/apis/messaging/v1alpha1/pipeline_defaults.go b/pkg/apis/messaging/v1alpha1/pipeline_defaults.go new file mode 100644 index 00000000000..6b5ae07d68f --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/pipeline_defaults.go @@ -0,0 +1,27 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import "context" + +func (s *Pipeline) SetDefaults(ctx context.Context) { + s.Spec.SetDefaults(ctx) +} + +func (ss *PipelineSpec) SetDefaults(ctx context.Context) { + // TODO anything? +} diff --git a/pkg/apis/messaging/v1alpha1/pipeline_lifecycle.go b/pkg/apis/messaging/v1alpha1/pipeline_lifecycle.go new file mode 100644 index 00000000000..9bccb747256 --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/pipeline_lifecycle.go @@ -0,0 +1,172 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/pkg/apis" + pkgduckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" +) + +var pCondSet = apis.NewLivingConditionSet(PipelineConditionReady, PipelineConditionChannelsReady, PipelineConditionSubscriptionsReady, PipelineConditionAddressable) + +const ( + // PipelineConditionReady has status True when all subconditions below have been set to True. + PipelineConditionReady = apis.ConditionReady + + // PipelineChannelsReady has status True when all the channels created as part of + // this pipeline are ready. + PipelineConditionChannelsReady apis.ConditionType = "ChannelsReady" + + // PipelineSubscriptionsReady has status True when all the subscriptions created as part of + // this pipeline are ready. + PipelineConditionSubscriptionsReady apis.ConditionType = "SubscriptionsReady" + + // PipelineConditionAddressable has status true when this Pipeline meets + // the Addressable contract and has a non-empty hostname. + PipelineConditionAddressable apis.ConditionType = "Addressable" +) + +// GetCondition returns the condition currently associated with the given type, or nil. +func (ps *PipelineStatus) GetCondition(t apis.ConditionType) *apis.Condition { + return pCondSet.Manage(ps).GetCondition(t) +} + +// IsReady returns true if the resource is ready overall. +func (ps *PipelineStatus) IsReady() bool { + return pCondSet.Manage(ps).IsHappy() +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (ps *PipelineStatus) InitializeConditions() { + pCondSet.Manage(ps).InitializeConditions() +} + +// PropagateSubscriptionStatuses sets the SubscriptionStatuses and PipelineConditionSubscriptionsReady based on +// the status of the incoming subscriptions. +func (ps *PipelineStatus) PropagateSubscriptionStatuses(subscriptions []*eventingv1alpha1.Subscription) { + ps.SubscriptionStatuses = make([]PipelineSubscriptionStatus, len(subscriptions)) + allReady := true + // If there are no subscriptions, treat that as a False case. Could go either way, but this seems right. + if len(subscriptions) == 0 { + allReady = false + + } + for i, s := range subscriptions { + ps.SubscriptionStatuses[i] = PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: s.APIVersion, + Kind: s.Kind, + Name: s.Name, + Namespace: s.Namespace, + }, + } + readyCondition := s.Status.GetCondition(eventingv1alpha1.SubscriptionConditionReady) + if readyCondition != nil { + ps.SubscriptionStatuses[i].ReadyCondition = *readyCondition + if readyCondition.Status != corev1.ConditionTrue { + allReady = false + } + } else { + allReady = false + } + + } + if allReady { + pCondSet.Manage(ps).MarkTrue(PipelineConditionSubscriptionsReady) + } else { + ps.MarkSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none") + } +} + +// PropagateChannelStatuses sets the ChannelStatuses and PipelineConditionChannelsReady based on the +// status of the incoming channels. +func (ps *PipelineStatus) PropagateChannelStatuses(channels []*duckv1alpha1.Channelable) { + ps.ChannelStatuses = make([]PipelineChannelStatus, len(channels)) + allReady := true + // If there are no channels, treat that as a False case. Could go either way, but this seems right. + if len(channels) == 0 { + allReady = false + + } + for i, c := range channels { + ps.ChannelStatuses[i] = PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: c.APIVersion, + Kind: c.Kind, + Name: c.Name, + Namespace: c.Namespace, + }, + } + // TODO: Once the addressable has a real status to dig through, use that here instead of + // addressable, because it might be addressable but not ready. + address := c.Status.AddressStatus.Address + if address != nil { + ps.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionTrue} + } else { + ps.ChannelStatuses[i].ReadyCondition = apis.Condition{Type: apis.ConditionReady, Status: corev1.ConditionFalse, Reason: "NotAddressable", Message: "Channel is not addressable"} + allReady = false + } + + // If the first channel is addressable, mark it as such + if i == 0 { + ps.setAddress(address) + } + } + if allReady { + pCondSet.Manage(ps).MarkTrue(PipelineConditionChannelsReady) + } else { + ps.MarkChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none") + } +} + +func (ps *PipelineStatus) MarkChannelsNotReady(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkFalse(PipelineConditionChannelsReady, reason, messageFormat, messageA...) +} + +func (ps *PipelineStatus) MarkSubscriptionsNotReady(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkFalse(PipelineConditionSubscriptionsReady, reason, messageFormat, messageA...) +} + +func (ps *PipelineStatus) MarkAddressableNotReady(reason, messageFormat string, messageA ...interface{}) { + pCondSet.Manage(ps).MarkFalse(PipelineConditionAddressable, reason, messageFormat, messageA...) +} + +// TODO: Use the new beta duck types. +func (ps *PipelineStatus) setAddress(address *pkgduckv1alpha1.Addressable) { + if address == nil { + ps.Address.Hostname = "" + ps.Address.URL = nil + pCondSet.Manage(ps).MarkFalse(PipelineConditionAddressable, "emptyHostname", "hostname is the empty string") + return + } + + if address.URL != nil { + ps.Address.Hostname = address.URL.Host + ps.Address.URL = address.URL + pCondSet.Manage(ps).MarkTrue(PipelineConditionAddressable) + } else if address.Hostname != "" { + ps.Address.Hostname = address.Hostname + pCondSet.Manage(ps).MarkTrue(PipelineConditionAddressable) + } else { + ps.Address.Hostname = "" + ps.Address.URL = nil + pCondSet.Manage(ps).MarkFalse(PipelineConditionAddressable, "emptyHostname", "hostname is the empty string") + } +} diff --git a/pkg/apis/messaging/v1alpha1/pipeline_lifecycle_test.go b/pkg/apis/messaging/v1alpha1/pipeline_lifecycle_test.go new file mode 100644 index 00000000000..a91cee89d3d --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/pipeline_lifecycle_test.go @@ -0,0 +1,410 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/pkg/apis" + + "github.com/google/go-cmp/cmp" + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + pkgduckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var pipelineConditionReady = apis.Condition{ + Type: PipelineConditionReady, + Status: corev1.ConditionTrue, +} + +var pipelineConditionChannelsReady = apis.Condition{ + Type: PipelineConditionChannelsReady, + Status: corev1.ConditionTrue, +} + +var pipelineConditionSubscriptionsReady = apis.Condition{ + Type: PipelineConditionSubscriptionsReady, + Status: corev1.ConditionTrue, +} + +func getSubscription(name string, ready bool) *eventingv1alpha1.Subscription { + s := eventingv1alpha1.Subscription{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "testns", + }, + Status: eventingv1alpha1.SubscriptionStatus{}, + } + if ready { + s.Status.MarkChannelReady() + s.Status.MarkReferencesResolved() + s.Status.MarkAddedToChannel() + } else { + s.Status.MarkChannelNotReady("testInducedFailure", "Test Induced failure") + s.Status.MarkReferencesNotResolved("testInducedFailure", "Test Induced failure") + s.Status.MarkNotAddedToChannel("testInducedfailure", "Test Induced failure") + } + return &s +} + +func getChannelable(ready bool) *duckv1alpha1.Channelable { + URL, _ := apis.ParseURL("http://example.com") + s := duckv1alpha1.Channelable{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "InMemoryChannel", + }, + ObjectMeta: metav1.ObjectMeta{}, + Status: duckv1alpha1.ChannelableStatus{}, + } + + if ready { + s.Status.Address = &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{URL}, ""} + } + + return &s +} + +func TestPipelineGetCondition(t *testing.T) { + tests := []struct { + name string + ss *PipelineStatus + condQuery apis.ConditionType + want *apis.Condition + }{{ + name: "single condition", + ss: &PipelineStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{ + pipelineConditionReady, + }, + }, + }, + condQuery: apis.ConditionReady, + want: &pipelineConditionReady, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.ss.GetCondition(test.condQuery) + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) + } +} + +func TestPipelineInitializeConditions(t *testing.T) { + tests := []struct { + name string + ts *PipelineStatus + want *PipelineStatus + }{{ + name: "empty", + ts: &PipelineStatus{}, + want: &PipelineStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: PipelineConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionChannelsReady, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionReady, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionSubscriptionsReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one false", + ts: &PipelineStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: PipelineConditionChannelsReady, + Status: corev1.ConditionFalse, + }}, + }, + }, + want: &PipelineStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: PipelineConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionChannelsReady, + Status: corev1.ConditionFalse, + }, { + Type: PipelineConditionReady, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionSubscriptionsReady, + Status: corev1.ConditionUnknown, + }}, + }, + }, + }, { + name: "one true", + ts: &PipelineStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: PipelineConditionSubscriptionsReady, + Status: corev1.ConditionTrue, + }}, + }, + }, + want: &PipelineStatus{ + Status: duckv1beta1.Status{ + Conditions: []apis.Condition{{ + Type: PipelineConditionAddressable, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionChannelsReady, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionReady, + Status: corev1.ConditionUnknown, + }, { + Type: PipelineConditionSubscriptionsReady, + Status: corev1.ConditionTrue, + }}, + }, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + test.ts.InitializeConditions() + if diff := cmp.Diff(test.want, test.ts, ignoreAllButTypeAndStatus); diff != "" { + t.Errorf("unexpected conditions (-want, +got) = %v", diff) + } + }) + } +} + +func TestPipelinePropagateSubscriptionStatuses(t *testing.T) { + tests := []struct { + name string + subs []*eventingv1alpha1.Subscription + want corev1.ConditionStatus + }{{ + name: "empty", + subs: []*eventingv1alpha1.Subscription{}, + want: corev1.ConditionFalse, + }, { + name: "empty status", + subs: []*eventingv1alpha1.Subscription{&eventingv1alpha1.Subscription{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: eventingv1alpha1.SubscriptionStatus{}, + }, + }, + want: corev1.ConditionFalse, + }, { + name: "one subscription not ready", + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", false)}, + want: corev1.ConditionFalse, + }, { + name: "one subscription ready", + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: corev1.ConditionTrue, + }, { + name: "one subscription ready, one not", + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + want: corev1.ConditionFalse, + }, { + name: "two subscriptions ready", + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: corev1.ConditionTrue, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := PipelineStatus{} + ps.PropagateSubscriptionStatuses(test.subs) + got := ps.GetCondition(PipelineConditionSubscriptionsReady).Status + want := test.want + if want != got { + t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got) + } + }) + } +} + +func TestPipelinePropagateChannelStatuses(t *testing.T) { + tests := []struct { + name string + channels []*duckv1alpha1.Channelable + want corev1.ConditionStatus + }{{ + name: "empty", + channels: []*duckv1alpha1.Channelable{}, + want: corev1.ConditionFalse, + }, { + name: "one channelable not ready", + channels: []*duckv1alpha1.Channelable{getChannelable(false)}, + want: corev1.ConditionFalse, + }, { + name: "one channelable ready", + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + want: corev1.ConditionTrue, + }, { + name: "one channelable ready, one not", + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(false)}, + want: corev1.ConditionFalse, + }, { + name: "two channelables ready", + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(true)}, + want: corev1.ConditionTrue, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := PipelineStatus{} + ps.PropagateChannelStatuses(test.channels) + got := ps.GetCondition(PipelineConditionChannelsReady).Status + want := test.want + if want != got { + t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got) + } + }) + } +} + +func TestPipelineReady(t *testing.T) { + tests := []struct { + name string + subs []*eventingv1alpha1.Subscription + channels []*duckv1alpha1.Channelable + want bool + }{{ + name: "empty", + subs: []*eventingv1alpha1.Subscription{}, + channels: []*duckv1alpha1.Channelable{}, + want: false, + }, { + name: "one channelable not ready, one subscription ready", + channels: []*duckv1alpha1.Channelable{getChannelable(false)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: false, + }, { + name: "one channelable ready, one subscription not ready", + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", false)}, + want: false, + }, { + name: "one channelable ready, one subscription ready", + channels: []*duckv1alpha1.Channelable{getChannelable(true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true)}, + want: true, + }, { + name: "one channelable ready, one not, two subsriptions ready", + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(false)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: false, + }, { + name: "two channelables ready, one subscription ready, one not", + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + want: false, + }, { + name: "two channelables ready, two subscriptions ready", + channels: []*duckv1alpha1.Channelable{getChannelable(true), getChannelable(true)}, + subs: []*eventingv1alpha1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: true, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := PipelineStatus{} + ps.PropagateChannelStatuses(test.channels) + ps.PropagateSubscriptionStatuses(test.subs) + got := ps.IsReady() + want := test.want + if want != got { + t.Errorf("unexpected conditions (-want, +got) = %v %v", want, got) + } + }) + } +} + +func TestPipelinePropagateSetAddress(t *testing.T) { + URL, _ := apis.ParseURL("http://example.com") + tests := []struct { + name string + address *pkgduckv1alpha1.Addressable + want *pkgduckv1alpha1.Addressable + wantStatus corev1.ConditionStatus + }{{ + name: "nil", + address: nil, + want: &pkgduckv1alpha1.Addressable{}, + wantStatus: corev1.ConditionFalse, + }, { + name: "empty", + address: &pkgduckv1alpha1.Addressable{}, + want: &pkgduckv1alpha1.Addressable{}, + wantStatus: corev1.ConditionFalse, + }, { + name: "URL", + address: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{URL}, ""}, + want: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{URL}, "example.com"}, + wantStatus: corev1.ConditionTrue, + }, { + name: "hostname", + address: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{}, "myhostname"}, + want: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{}, "myhostname"}, + wantStatus: corev1.ConditionTrue, + }, { + name: "nil", + address: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{nil}, ""}, + want: &pkgduckv1alpha1.Addressable{duckv1beta1.Addressable{}, ""}, + wantStatus: corev1.ConditionFalse, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := PipelineStatus{} + ps.setAddress(test.address) + got := ps.Address + if diff := cmp.Diff(test.want, &got, ignoreAllButTypeAndStatus); diff != "" { + t.Errorf("unexpected address (-want, +got) = %v", diff) + } + gotStatus := ps.GetCondition(PipelineConditionAddressable).Status + if test.wantStatus != gotStatus { + t.Errorf("unexpected conditions (-want, +got) = %v %v", test.wantStatus, gotStatus) + } + }) + } +} diff --git a/pkg/apis/messaging/v1alpha1/pipeline_types.go b/pkg/apis/messaging/v1alpha1/pipeline_types.go new file mode 100644 index 00000000000..9d9f51c4ebe --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/pipeline_types.go @@ -0,0 +1,154 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package v1alpha1 + +import ( + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/pkg/apis" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" + "github.com/knative/pkg/webhook" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// Pipeline defines a sequence of Subscribers that will be wired in +// series through Channels and Subscriptions. +type Pipeline struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec defines the desired state of the Pipeline. + Spec PipelineSpec `json:"spec,omitempty"` + + // Status represents the current state of the Pipeline. This data may be out of + // date. + // +optional + Status PipelineStatus `json:"status,omitempty"` +} + +// Check that Pipeline can be validated, can be defaulted, and has immutable fields. +var _ apis.Validatable = (*Pipeline)(nil) +var _ apis.Defaultable = (*Pipeline)(nil) + +// TODO: make appropriate fields immutable. +//var _ apis.Immutable = (*Pipeline)(nil) +var _ runtime.Object = (*Pipeline)(nil) +var _ webhook.GenericCRD = (*Pipeline)(nil) + +// This should be duck so that Broker can also use this +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type ChannelTemplateSpec struct { + metav1.TypeMeta `json:",inline"` + + // Spec defines the Spec to use for each channel created. Passed + // in verbatim to the Channel CRD as Spec section. + // +optional + Spec runtime.RawExtension `json:"spec"` +} + +// Internal version of ChannelTemplateSpec that includes ObjectMeta so that +// we can easily create new Channels off of it. +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +type ChannelTemplateSpecInternal struct { + metav1.TypeMeta `json:",inline"` + + // +optional + metav1.ObjectMeta `json:"metadata,omitempty"` + + // Spec defines the Spec to use for each channel created. Passed + // in verbatim to the Channel CRD as Spec section. + // +optional + Spec runtime.RawExtension `json:"spec"` +} + +type PipelineSpec struct { + // Steps is the list of Subscribers (processors / functions) that will be called in the order + // provided. + Steps []eventingv1alpha1.SubscriberSpec `json:"steps"` + + // ChannelTemplate specifies which Channel CRD to use + ChannelTemplate ChannelTemplateSpec `json:"channelTemplate"` + + // Reply is a Reference to where the result of the last Subscriber gets sent to. + // + // You can specify only the following fields of the ObjectReference: + // - Kind + // - APIVersion + // - Name + // + // The resource pointed by this ObjectReference must meet the Addressable contract + // with a reference to the Addressable duck type. If the resource does not meet this contract, + // it will be reflected in the Subscription's status. + // +optional + Reply *corev1.ObjectReference `json:"reply,omitempty"` +} + +type PipelineChannelStatus struct { + // Channel is the reference to the underlying channel. + Channel corev1.ObjectReference `json:"channel"` + + // ReadyCondition indicates whether the Channel is ready or not. + ReadyCondition apis.Condition `json:"ready"` +} + +type PipelineSubscriptionStatus struct { + // Subscription is the reference to the underlying Subscription. + Subscription corev1.ObjectReference `json:"subscription"` + + // ReadyCondition indicates whether the Subscription is ready or not. + ReadyCondition apis.Condition `json:"ready"` +} + +// PipelineStatus represents the current state of a Pipeline. +type PipelineStatus struct { + // inherits duck/v1alpha1 Status, which currently provides: + // * ObservedGeneration - the 'Generation' of the Service that was last processed by the controller. + // * Conditions - the latest available observations of a resource's current state. + duckv1beta1.Status `json:",inline"` + + // SubscriptionStatuses is an array of corresponding Subscription statuses. + // Matches the Spec.Steps array in the order. + SubscriptionStatuses []PipelineSubscriptionStatus + + // ChannelStatuses is an array of corresponding Channel statuses. + // Matches the Spec.Steps array in the order. + ChannelStatuses []PipelineChannelStatus + + // Addressable is the starting point to this Pipeline. Sending to this will target the first Subscriber. + Address duckv1alpha1.Addressable `json:"address,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// PipelineList is a collection of Pipelines. +type PipelineList struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ListMeta `json:"metadata,omitempty"` + Items []Pipeline `json:"items"` +} + +// GetGroupVersionKind returns GroupVersionKind for InMemoryChannels +func (p *Pipeline) GetGroupVersionKind() schema.GroupVersionKind { + return SchemeGroupVersion.WithKind("Pipeline") +} diff --git a/pkg/apis/messaging/v1alpha1/pipeline_validation.go b/pkg/apis/messaging/v1alpha1/pipeline_validation.go new file mode 100644 index 00000000000..8d9cc4acc3b --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/pipeline_validation.go @@ -0,0 +1,62 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/pkg/apis" + "k8s.io/apimachinery/pkg/api/equality" +) + +func (p *Pipeline) Validate(ctx context.Context) *apis.FieldError { + return p.Spec.Validate(ctx).ViaField("spec") +} + +func (ps *PipelineSpec) Validate(ctx context.Context) *apis.FieldError { + var errs *apis.FieldError + + if len(ps.Steps) == 0 { + errs = errs.Also(apis.ErrMissingField("steps")) + } + + for i, s := range ps.Steps { + if e := eventingv1alpha1.IsValidSubscriberSpec(s); e != nil { + errs = errs.Also(apis.ErrInvalidArrayValue(s, "steps", i)) + } + } + + if equality.Semantic.DeepEqual(ps.ChannelTemplate, ChannelTemplateSpec{}) { + errs = errs.Also(apis.ErrMissingField("channelTemplate")) + return errs + } + + if len(ps.ChannelTemplate.APIVersion) == 0 { + errs = errs.Also(apis.ErrMissingField("channelTemplate.apiVersion")) + } + + if len(ps.ChannelTemplate.Kind) == 0 { + errs = errs.Also(apis.ErrMissingField("channelTemplate.kind")) + } + if ps.Reply != nil { + if err := eventingv1alpha1.IsValidObjectReference(*ps.Reply); err != nil { + errs = errs.Also(err.ViaField("reply")) + } + } + return errs +} diff --git a/pkg/apis/messaging/v1alpha1/pipeline_validation_test.go b/pkg/apis/messaging/v1alpha1/pipeline_validation_test.go new file mode 100644 index 00000000000..ce04fbc1ae2 --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/pipeline_validation_test.go @@ -0,0 +1,178 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/pkg/apis" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestPipelineValidation(t *testing.T) { + name := "invalid pipeline spec" + pipeline := &Pipeline{Spec: PipelineSpec{}} + + want := &apis.FieldError{ + Paths: []string{"spec.channelTemplate", "spec.steps"}, + Message: "missing field(s)", + } + + t.Run(name, func(t *testing.T) { + got := pipeline.Validate(context.TODO()) + if diff := cmp.Diff(want.Error(), got.Error()); diff != "" { + t.Errorf("Pipeline.Validate (-want, +got) = %v", diff) + } + }) +} + +func makeValidReply(channelName string) *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: channelName, + } +} + +func makeInvalidReply(channelName string) *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Namespace: "notallowed", + Name: channelName, + } +} + +func TestPipelineSpecValidation(t *testing.T) { + subscriberURI := "http://example.com" + validChannelTemplate := ChannelTemplateSpec{ + metav1.TypeMeta{ + Kind: "mykind", + APIVersion: "myapiversion", + }, + runtime.RawExtension{}, + } + tests := []struct { + name string + ts *PipelineSpec + want *apis.FieldError + }{{ + name: "invalid pipeline spec - empty", + ts: &PipelineSpec{}, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate", "steps") + return fe + }(), + }, { + name: "invalid pipeline spec - empty steps", + ts: &PipelineSpec{ + ChannelTemplate: validChannelTemplate, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("steps") + return fe + }(), + }, { + name: "missing channeltemplatespec", + ts: &PipelineSpec{ + Steps: []eventingv1alpha1.SubscriberSpec{{URI: &subscriberURI}}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate") + return fe + }(), + }, { + name: "invalid channeltemplatespec missing APIVersion", + ts: &PipelineSpec{ + ChannelTemplate: ChannelTemplateSpec{metav1.TypeMeta{Kind: "mykind"}, runtime.RawExtension{}}, + Steps: []eventingv1alpha1.SubscriberSpec{{URI: &subscriberURI}}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate.apiVersion") + return fe + }(), + }, { + name: "invalid channeltemplatespec missing Kind", + ts: &PipelineSpec{ + ChannelTemplate: ChannelTemplateSpec{metav1.TypeMeta{APIVersion: "myapiversion"}, runtime.RawExtension{}}, + Steps: []eventingv1alpha1.SubscriberSpec{{URI: &subscriberURI}}, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("channelTemplate.kind") + return fe + }(), + }, { + name: "valid pipeline", + ts: &PipelineSpec{ + ChannelTemplate: validChannelTemplate, + Steps: []eventingv1alpha1.SubscriberSpec{{URI: &subscriberURI}}, + }, + want: func() *apis.FieldError { + return nil + }(), + }, { + name: "valid pipeline with valid reply", + ts: &PipelineSpec{ + ChannelTemplate: validChannelTemplate, + Steps: []eventingv1alpha1.SubscriberSpec{{URI: &subscriberURI}}, + Reply: makeValidReply("reply-channel"), + }, + want: func() *apis.FieldError { + return nil + }(), + }, { + name: "valid pipeline with invalid missing name", + ts: &PipelineSpec{ + ChannelTemplate: validChannelTemplate, + Steps: []eventingv1alpha1.SubscriberSpec{{URI: &subscriberURI}}, + Reply: &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + }, + }, + want: func() *apis.FieldError { + fe := apis.ErrMissingField("reply.name") + return fe + }(), + }, { + name: "valid pipeline with invalid reply", + ts: &PipelineSpec{ + ChannelTemplate: validChannelTemplate, + Steps: []eventingv1alpha1.SubscriberSpec{{URI: &subscriberURI}}, + Reply: makeInvalidReply("reply-channel"), + }, + want: func() *apis.FieldError { + fe := apis.ErrDisallowedFields("reply.Namespace") + fe.Details = "only name, apiVersion and kind are supported fields" + return fe + }(), + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.ts.Validate(context.TODO()) + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("%s: Validate PipelineSpec (-want, +got) = %v", test.name, diff) + } + }) + } +} diff --git a/pkg/apis/messaging/v1alpha1/register.go b/pkg/apis/messaging/v1alpha1/register.go index bd7d4163d65..ad261af27cc 100644 --- a/pkg/apis/messaging/v1alpha1/register.go +++ b/pkg/apis/messaging/v1alpha1/register.go @@ -47,6 +47,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &InMemoryChannel{}, &InMemoryChannelList{}, + &Pipeline{}, + &PipelineList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index 1ab4cf03025..f75d64954e8 100644 --- a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -22,9 +22,64 @@ package v1alpha1 import ( duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelTemplateSpec) DeepCopyInto(out *ChannelTemplateSpec) { + *out = *in + out.TypeMeta = in.TypeMeta + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelTemplateSpec. +func (in *ChannelTemplateSpec) DeepCopy() *ChannelTemplateSpec { + if in == nil { + return nil + } + out := new(ChannelTemplateSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ChannelTemplateSpec) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelTemplateSpecInternal) DeepCopyInto(out *ChannelTemplateSpecInternal) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelTemplateSpecInternal. +func (in *ChannelTemplateSpecInternal) DeepCopy() *ChannelTemplateSpecInternal { + if in == nil { + return nil + } + out := new(ChannelTemplateSpecInternal) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ChannelTemplateSpecInternal) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InMemoryChannel) DeepCopyInto(out *InMemoryChannel) { *out = *in @@ -124,3 +179,161 @@ func (in *InMemoryChannelStatus) DeepCopy() *InMemoryChannelStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Pipeline) DeepCopyInto(out *Pipeline) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Pipeline. +func (in *Pipeline) DeepCopy() *Pipeline { + if in == nil { + return nil + } + out := new(Pipeline) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Pipeline) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PipelineChannelStatus) DeepCopyInto(out *PipelineChannelStatus) { + *out = *in + out.Channel = in.Channel + in.ReadyCondition.DeepCopyInto(&out.ReadyCondition) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineChannelStatus. +func (in *PipelineChannelStatus) DeepCopy() *PipelineChannelStatus { + if in == nil { + return nil + } + out := new(PipelineChannelStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PipelineList) DeepCopyInto(out *PipelineList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Pipeline, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineList. +func (in *PipelineList) DeepCopy() *PipelineList { + if in == nil { + return nil + } + out := new(PipelineList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PipelineList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PipelineSpec) DeepCopyInto(out *PipelineSpec) { + *out = *in + if in.Steps != nil { + in, out := &in.Steps, &out.Steps + *out = make([]eventingv1alpha1.SubscriberSpec, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + in.ChannelTemplate.DeepCopyInto(&out.ChannelTemplate) + if in.Reply != nil { + in, out := &in.Reply, &out.Reply + *out = new(v1.ObjectReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineSpec. +func (in *PipelineSpec) DeepCopy() *PipelineSpec { + if in == nil { + return nil + } + out := new(PipelineSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PipelineStatus) DeepCopyInto(out *PipelineStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + if in.SubscriptionStatuses != nil { + in, out := &in.SubscriptionStatuses, &out.SubscriptionStatuses + *out = make([]PipelineSubscriptionStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.ChannelStatuses != nil { + in, out := &in.ChannelStatuses, &out.ChannelStatuses + *out = make([]PipelineChannelStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + in.Address.DeepCopyInto(&out.Address) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineStatus. +func (in *PipelineStatus) DeepCopy() *PipelineStatus { + if in == nil { + return nil + } + out := new(PipelineStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PipelineSubscriptionStatus) DeepCopyInto(out *PipelineSubscriptionStatus) { + *out = *in + out.Subscription = in.Subscription + in.ReadyCondition.DeepCopyInto(&out.ReadyCondition) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PipelineSubscriptionStatus. +func (in *PipelineSubscriptionStatus) DeepCopy() *PipelineSubscriptionStatus { + if in == nil { + return nil + } + out := new(PipelineSubscriptionStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go index 749d241e42d..9a8159f46fb 100644 --- a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_messaging_client.go @@ -32,6 +32,10 @@ func (c *FakeMessagingV1alpha1) InMemoryChannels(namespace string) v1alpha1.InMe return &FakeInMemoryChannels{c, namespace} } +func (c *FakeMessagingV1alpha1) Pipelines(namespace string) v1alpha1.PipelineInterface { + return &FakePipelines{c, namespace} +} + // RESTClient returns a RESTClient that is used to communicate // with API server by this client implementation. func (c *FakeMessagingV1alpha1) RESTClient() rest.Interface { diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_pipeline.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_pipeline.go new file mode 100644 index 00000000000..a09a90b64ae --- /dev/null +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/fake/fake_pipeline.go @@ -0,0 +1,140 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakePipelines implements PipelineInterface +type FakePipelines struct { + Fake *FakeMessagingV1alpha1 + ns string +} + +var pipelinesResource = schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1alpha1", Resource: "pipelines"} + +var pipelinesKind = schema.GroupVersionKind{Group: "messaging.knative.dev", Version: "v1alpha1", Kind: "Pipeline"} + +// Get takes name of the pipeline, and returns the corresponding pipeline object, and an error if there is any. +func (c *FakePipelines) Get(name string, options v1.GetOptions) (result *v1alpha1.Pipeline, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(pipelinesResource, c.ns, name), &v1alpha1.Pipeline{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Pipeline), err +} + +// List takes label and field selectors, and returns the list of Pipelines that match those selectors. +func (c *FakePipelines) List(opts v1.ListOptions) (result *v1alpha1.PipelineList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(pipelinesResource, pipelinesKind, c.ns, opts), &v1alpha1.PipelineList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.PipelineList{ListMeta: obj.(*v1alpha1.PipelineList).ListMeta} + for _, item := range obj.(*v1alpha1.PipelineList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested pipelines. +func (c *FakePipelines) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(pipelinesResource, c.ns, opts)) + +} + +// Create takes the representation of a pipeline and creates it. Returns the server's representation of the pipeline, and an error, if there is any. +func (c *FakePipelines) Create(pipeline *v1alpha1.Pipeline) (result *v1alpha1.Pipeline, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(pipelinesResource, c.ns, pipeline), &v1alpha1.Pipeline{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Pipeline), err +} + +// Update takes the representation of a pipeline and updates it. Returns the server's representation of the pipeline, and an error, if there is any. +func (c *FakePipelines) Update(pipeline *v1alpha1.Pipeline) (result *v1alpha1.Pipeline, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(pipelinesResource, c.ns, pipeline), &v1alpha1.Pipeline{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Pipeline), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakePipelines) UpdateStatus(pipeline *v1alpha1.Pipeline) (*v1alpha1.Pipeline, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(pipelinesResource, "status", c.ns, pipeline), &v1alpha1.Pipeline{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Pipeline), err +} + +// Delete takes name of the pipeline and deletes it. Returns an error if one occurs. +func (c *FakePipelines) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(pipelinesResource, c.ns, name), &v1alpha1.Pipeline{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakePipelines) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(pipelinesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.PipelineList{}) + return err +} + +// Patch applies the patch and returns the patched pipeline. +func (c *FakePipelines) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Pipeline, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(pipelinesResource, c.ns, name, data, subresources...), &v1alpha1.Pipeline{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Pipeline), err +} diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go index ed2a76fd1e7..e675ed43ee5 100644 --- a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/generated_expansion.go @@ -19,3 +19,5 @@ limitations under the License. package v1alpha1 type InMemoryChannelExpansion interface{} + +type PipelineExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go index 2a8987e43a3..9154f211c57 100644 --- a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/messaging_client.go @@ -28,6 +28,7 @@ import ( type MessagingV1alpha1Interface interface { RESTClient() rest.Interface InMemoryChannelsGetter + PipelinesGetter } // MessagingV1alpha1Client is used to interact with features provided by the messaging.knative.dev group. @@ -39,6 +40,10 @@ func (c *MessagingV1alpha1Client) InMemoryChannels(namespace string) InMemoryCha return newInMemoryChannels(c, namespace) } +func (c *MessagingV1alpha1Client) Pipelines(namespace string) PipelineInterface { + return newPipelines(c, namespace) +} + // NewForConfig creates a new MessagingV1alpha1Client for the given config. func NewForConfig(c *rest.Config) (*MessagingV1alpha1Client, error) { config := *c diff --git a/pkg/client/clientset/versioned/typed/messaging/v1alpha1/pipeline.go b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/pipeline.go new file mode 100644 index 00000000000..abafb1dffe9 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/messaging/v1alpha1/pipeline.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + scheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// PipelinesGetter has a method to return a PipelineInterface. +// A group's client should implement this interface. +type PipelinesGetter interface { + Pipelines(namespace string) PipelineInterface +} + +// PipelineInterface has methods to work with Pipeline resources. +type PipelineInterface interface { + Create(*v1alpha1.Pipeline) (*v1alpha1.Pipeline, error) + Update(*v1alpha1.Pipeline) (*v1alpha1.Pipeline, error) + UpdateStatus(*v1alpha1.Pipeline) (*v1alpha1.Pipeline, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.Pipeline, error) + List(opts v1.ListOptions) (*v1alpha1.PipelineList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Pipeline, err error) + PipelineExpansion +} + +// pipelines implements PipelineInterface +type pipelines struct { + client rest.Interface + ns string +} + +// newPipelines returns a Pipelines +func newPipelines(c *MessagingV1alpha1Client, namespace string) *pipelines { + return &pipelines{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the pipeline, and returns the corresponding pipeline object, and an error if there is any. +func (c *pipelines) Get(name string, options v1.GetOptions) (result *v1alpha1.Pipeline, err error) { + result = &v1alpha1.Pipeline{} + err = c.client.Get(). + Namespace(c.ns). + Resource("pipelines"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of Pipelines that match those selectors. +func (c *pipelines) List(opts v1.ListOptions) (result *v1alpha1.PipelineList, err error) { + result = &v1alpha1.PipelineList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("pipelines"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested pipelines. +func (c *pipelines) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("pipelines"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a pipeline and creates it. Returns the server's representation of the pipeline, and an error, if there is any. +func (c *pipelines) Create(pipeline *v1alpha1.Pipeline) (result *v1alpha1.Pipeline, err error) { + result = &v1alpha1.Pipeline{} + err = c.client.Post(). + Namespace(c.ns). + Resource("pipelines"). + Body(pipeline). + Do(). + Into(result) + return +} + +// Update takes the representation of a pipeline and updates it. Returns the server's representation of the pipeline, and an error, if there is any. +func (c *pipelines) Update(pipeline *v1alpha1.Pipeline) (result *v1alpha1.Pipeline, err error) { + result = &v1alpha1.Pipeline{} + err = c.client.Put(). + Namespace(c.ns). + Resource("pipelines"). + Name(pipeline.Name). + Body(pipeline). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *pipelines) UpdateStatus(pipeline *v1alpha1.Pipeline) (result *v1alpha1.Pipeline, err error) { + result = &v1alpha1.Pipeline{} + err = c.client.Put(). + Namespace(c.ns). + Resource("pipelines"). + Name(pipeline.Name). + SubResource("status"). + Body(pipeline). + Do(). + Into(result) + return +} + +// Delete takes name of the pipeline and deletes it. Returns an error if one occurs. +func (c *pipelines) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("pipelines"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *pipelines) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("pipelines"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched pipeline. +func (c *pipelines) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Pipeline, err error) { + result = &v1alpha1.Pipeline{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("pipelines"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 5666ee78315..589efe24d91 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -71,6 +71,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=messaging.knative.dev, Version=v1alpha1 case messagingv1alpha1.SchemeGroupVersion.WithResource("inmemorychannels"): return &genericInformer{resource: resource.GroupResource(), informer: f.Messaging().V1alpha1().InMemoryChannels().Informer()}, nil + case messagingv1alpha1.SchemeGroupVersion.WithResource("pipelines"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Messaging().V1alpha1().Pipelines().Informer()}, nil // Group=sources.eventing.knative.dev, Version=v1alpha1 case sourcesv1alpha1.SchemeGroupVersion.WithResource("apiserversources"): diff --git a/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go b/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go index 60cbc3d2b54..55664d25a86 100644 --- a/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/messaging/v1alpha1/interface.go @@ -26,6 +26,8 @@ import ( type Interface interface { // InMemoryChannels returns a InMemoryChannelInformer. InMemoryChannels() InMemoryChannelInformer + // Pipelines returns a PipelineInformer. + Pipelines() PipelineInformer } type version struct { @@ -43,3 +45,8 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList func (v *version) InMemoryChannels() InMemoryChannelInformer { return &inMemoryChannelInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } + +// Pipelines returns a PipelineInformer. +func (v *version) Pipelines() PipelineInformer { + return &pipelineInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/pkg/client/informers/externalversions/messaging/v1alpha1/pipeline.go b/pkg/client/informers/externalversions/messaging/v1alpha1/pipeline.go new file mode 100644 index 00000000000..7cd86b3bca4 --- /dev/null +++ b/pkg/client/informers/externalversions/messaging/v1alpha1/pipeline.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + messagingv1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + versioned "github.com/knative/eventing/pkg/client/clientset/versioned" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// PipelineInformer provides access to a shared informer and lister for +// Pipelines. +type PipelineInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.PipelineLister +} + +type pipelineInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewPipelineInformer constructs a new informer for Pipeline type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewPipelineInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredPipelineInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredPipelineInformer constructs a new informer for Pipeline type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredPipelineInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.MessagingV1alpha1().Pipelines(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.MessagingV1alpha1().Pipelines(namespace).Watch(options) + }, + }, + &messagingv1alpha1.Pipeline{}, + resyncPeriod, + indexers, + ) +} + +func (f *pipelineInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredPipelineInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *pipelineInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&messagingv1alpha1.Pipeline{}, f.defaultInformer) +} + +func (f *pipelineInformer) Lister() v1alpha1.PipelineLister { + return v1alpha1.NewPipelineLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/injection/informers/messaging/v1alpha1/pipeline/fake/fake.go b/pkg/client/injection/informers/messaging/v1alpha1/pipeline/fake/fake.go new file mode 100644 index 00000000000..e3e0a94465e --- /dev/null +++ b/pkg/client/injection/informers/messaging/v1alpha1/pipeline/fake/fake.go @@ -0,0 +1,40 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + fake "github.com/knative/eventing/pkg/client/injection/informers/messaging/factory/fake" + pipeline "github.com/knative/eventing/pkg/client/injection/informers/messaging/v1alpha1/pipeline" + controller "github.com/knative/pkg/controller" + injection "github.com/knative/pkg/injection" +) + +var Get = pipeline.Get + +func init() { + injection.Fake.RegisterInformer(withInformer) +} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := fake.Get(ctx) + inf := f.Messaging().V1alpha1().Pipelines() + return context.WithValue(ctx, pipeline.Key{}, inf), inf.Informer() +} diff --git a/pkg/client/injection/informers/messaging/v1alpha1/pipeline/pipeline.go b/pkg/client/injection/informers/messaging/v1alpha1/pipeline/pipeline.go new file mode 100644 index 00000000000..bd7dc176353 --- /dev/null +++ b/pkg/client/injection/informers/messaging/v1alpha1/pipeline/pipeline.go @@ -0,0 +1,52 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pipeline + +import ( + "context" + + v1alpha1 "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1" + factory "github.com/knative/eventing/pkg/client/injection/informers/messaging/factory" + controller "github.com/knative/pkg/controller" + injection "github.com/knative/pkg/injection" + logging "github.com/knative/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Messaging().V1alpha1().Pipelines() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1alpha1.PipelineInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Fatalf( + "Unable to fetch %T from context.", (v1alpha1.PipelineInformer)(nil)) + } + return untyped.(v1alpha1.PipelineInformer) +} diff --git a/pkg/client/listers/messaging/v1alpha1/expansion_generated.go b/pkg/client/listers/messaging/v1alpha1/expansion_generated.go index 8eaaa98e243..02221b279fd 100644 --- a/pkg/client/listers/messaging/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/messaging/v1alpha1/expansion_generated.go @@ -25,3 +25,11 @@ type InMemoryChannelListerExpansion interface{} // InMemoryChannelNamespaceListerExpansion allows custom methods to be added to // InMemoryChannelNamespaceLister. type InMemoryChannelNamespaceListerExpansion interface{} + +// PipelineListerExpansion allows custom methods to be added to +// PipelineLister. +type PipelineListerExpansion interface{} + +// PipelineNamespaceListerExpansion allows custom methods to be added to +// PipelineNamespaceLister. +type PipelineNamespaceListerExpansion interface{} diff --git a/pkg/client/listers/messaging/v1alpha1/pipeline.go b/pkg/client/listers/messaging/v1alpha1/pipeline.go new file mode 100644 index 00000000000..4424b0df6ae --- /dev/null +++ b/pkg/client/listers/messaging/v1alpha1/pipeline.go @@ -0,0 +1,94 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// PipelineLister helps list Pipelines. +type PipelineLister interface { + // List lists all Pipelines in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.Pipeline, err error) + // Pipelines returns an object that can list and get Pipelines. + Pipelines(namespace string) PipelineNamespaceLister + PipelineListerExpansion +} + +// pipelineLister implements the PipelineLister interface. +type pipelineLister struct { + indexer cache.Indexer +} + +// NewPipelineLister returns a new PipelineLister. +func NewPipelineLister(indexer cache.Indexer) PipelineLister { + return &pipelineLister{indexer: indexer} +} + +// List lists all Pipelines in the indexer. +func (s *pipelineLister) List(selector labels.Selector) (ret []*v1alpha1.Pipeline, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Pipeline)) + }) + return ret, err +} + +// Pipelines returns an object that can list and get Pipelines. +func (s *pipelineLister) Pipelines(namespace string) PipelineNamespaceLister { + return pipelineNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// PipelineNamespaceLister helps list and get Pipelines. +type PipelineNamespaceLister interface { + // List lists all Pipelines in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.Pipeline, err error) + // Get retrieves the Pipeline from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.Pipeline, error) + PipelineNamespaceListerExpansion +} + +// pipelineNamespaceLister implements the PipelineNamespaceLister +// interface. +type pipelineNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all Pipelines in the indexer for a given namespace. +func (s pipelineNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Pipeline, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Pipeline)) + }) + return ret, err +} + +// Get retrieves the Pipeline from the indexer for a given namespace and name. +func (s pipelineNamespaceLister) Get(name string) (*v1alpha1.Pipeline, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("pipeline"), name) + } + return obj.(*v1alpha1.Pipeline), nil +} diff --git a/pkg/reconciler/pipeline/pipeline.go b/pkg/reconciler/pipeline/pipeline.go new file mode 100644 index 00000000000..f18562c180c --- /dev/null +++ b/pkg/reconciler/pipeline/pipeline.go @@ -0,0 +1,290 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipeline + +import ( + "context" + "errors" + "fmt" + "reflect" + + corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/cache" + + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + eventinginformers "github.com/knative/eventing/pkg/client/informers/externalversions/eventing/v1alpha1" + informers "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1" + eventinglisters "github.com/knative/eventing/pkg/client/listers/eventing/v1alpha1" + listers "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" + "github.com/knative/eventing/pkg/duck" + "github.com/knative/eventing/pkg/logging" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/pipeline/resources" + "github.com/knative/eventing/pkg/utils" + duckroot "github.com/knative/pkg/apis" + duckapis "github.com/knative/pkg/apis/duck" + "github.com/knative/pkg/controller" + "github.com/knative/pkg/tracker" + "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/dynamic" +) + +const ( + // ReconcilerName is the name of the reconciler + ReconcilerName = "Pipelines" + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "pipeline-controller" + + reconciled = "Reconciled" + reconcileFailed = "ReconcileFailed" + updateStatusFailed = "UpdateStatusFailed" +) + +type Reconciler struct { + *reconciler.Base + + // listers index properties about resources + pipelineLister listers.PipelineLister + tracker tracker.Interface + addressableInformer duck.AddressableInformer + subscriptionLister eventinglisters.SubscriptionLister +} + +// Check that our Reconciler implements controller.Reconciler +var _ controller.Reconciler = (*Reconciler)(nil) + +// NewController initializes the controller and is called by the generated code +// Registers event handlers to enqueue events +func NewController( + opt reconciler.Options, + pipelineInformer informers.PipelineInformer, + addressableInformer duck.AddressableInformer, + subscriptionInformer eventinginformers.SubscriptionInformer, +) *controller.Impl { + + r := &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + pipelineLister: pipelineInformer.Lister(), + addressableInformer: addressableInformer, + subscriptionLister: subscriptionInformer.Lister(), + } + impl := controller.NewImpl(r, r.Logger, ReconcilerName) + + r.Logger.Info("Setting up event handlers") + + r.tracker = tracker.New(impl.EnqueueKey, opt.GetTrackerLease()) + pipelineInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + + // Register handler for Subscriptions that are owned by Pipeline, so that + // we get notified if they change. + subscriptionInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("Pipeline")), + Handler: controller.HandleAll(impl.EnqueueControllerOf), + }) + + return impl +} + +// Reconcile compares the actual state with the desired, and attempts to +// reconcile the two. It then updates the Status block of the Pipeline resource +// with the current Status of the resource. +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + logging.FromContext(ctx).Debug("reconciling", zap.String("key", key)) + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + logging.FromContext(ctx).Error("invalid resource key", zap.Error(err)) + return nil + } + + // Get the Pipeline resource with this namespace/name + original, err := r.pipelineLister.Pipelines(namespace).Get(name) + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("Pipeline key in work queue no longer exists") + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy + pipeline := original.DeepCopy() + + // Reconcile this copy of the Pipeline and then write back any status + // updates regardless of whether the reconcile error out. + reconcileErr := r.reconcile(ctx, pipeline) + if reconcileErr != nil { + logging.FromContext(ctx).Error("Error reconciling Pipeline", zap.Error(reconcileErr)) + r.Recorder.Eventf(pipeline, corev1.EventTypeWarning, reconcileFailed, "Pipeline reconciliation failed: %v", reconcileErr) + } else { + logging.FromContext(ctx).Debug("Successfully reconciled Pipeline") + r.Recorder.Eventf(pipeline, corev1.EventTypeNormal, reconciled, "Pipeline reconciled") + } + + if _, updateStatusErr := r.updateStatus(ctx, pipeline); updateStatusErr != nil { + logging.FromContext(ctx).Warn("Error updating Pipeline status", zap.Error(updateStatusErr)) + r.Recorder.Eventf(pipeline, corev1.EventTypeWarning, updateStatusFailed, "Failed to update pipeline status: %s", key) + return updateStatusErr + } + + // Requeue if the resource is not ready: + return reconcileErr +} + +func (r *Reconciler) reconcile(ctx context.Context, p *v1alpha1.Pipeline) error { + p.Status.InitializeConditions() + + // Reconciling pipeline is pretty straightforward, it does the following things: + // 1. Create a channel fronting the whole pipeline + // 2. For each of the Steps, create a Subscription to the previous Channel + // (hence the first step above for the first step in the "steps"), where the Subscriber points to the + // Step, and create intermediate channel for feeding the Reply to (if we allow Reply to be something else + // than channel, we could just (optionally) feed it directly to the following subscription. + // 3. Rinse and repeat step #2 above for each Step in the list + // 4. If there's a Reply, then the last Subscription will be configured to send the reply to that. + if p.DeletionTimestamp != nil { + // Everything is cleaned up by the garbage collector. + return nil + } + + channelResourceInterface := r.DynamicClientSet.Resource(duckroot.KindToResource(p.Spec.ChannelTemplate.GetObjectKind().GroupVersionKind())).Namespace(p.Namespace) + + if channelResourceInterface == nil { + msg := fmt.Sprintf("Unable to create dynamic client for: %+v", p.Spec.ChannelTemplate) + logging.FromContext(ctx).Error(msg) + return errors.New(msg) + } + + // Tell tracker to reconcile this Pipeline whenever my channels change. + track := r.addressableInformer.TrackInNamespace(r.tracker, p) + + channels := make([]*duckv1alpha1.Channelable, 0, len(p.Spec.Steps)) + for i := 0; i < len(p.Spec.Steps); i++ { + ingressChannelName := resources.PipelineChannelName(p.Name, i) + c, err := r.reconcileChannel(ctx, ingressChannelName, channelResourceInterface, p) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to reconcile Channel Object: %s/%s", p.Namespace, ingressChannelName), zap.Error(err)) + return err + + } + // Convert to Channel duck so that we can treat all Channels the same. + channelable := &duckv1alpha1.Channelable{} + err = duckapis.FromUnstructured(c, channelable) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", p.Namespace, ingressChannelName), zap.Error(err)) + return err + + } + // Track channels and enqueue pipeline when they change. + channels = append(channels, channelable) + if err = track(utils.ObjectRef(channelable, channelable.GroupVersionKind())); err != nil { + logging.FromContext(ctx).Error("Unable to track changes to Channel", zap.Error(err)) + return err + } + logging.FromContext(ctx).Info(fmt.Sprintf("Reconciled Channel Object: %s/%s %+v", p.Namespace, ingressChannelName, c)) + } + p.Status.PropagateChannelStatuses(channels) + + subs := make([]*eventingv1alpha1.Subscription, 0, len(p.Spec.Steps)) + for i := 0; i < len(p.Spec.Steps); i++ { + sub, err := r.reconcileSubscription(ctx, i, p) + if err != nil { + return fmt.Errorf("Failed to reconcile Subscription Object for step: %d : %s", i, err) + } + subs = append(subs, sub) + logging.FromContext(ctx).Debug(fmt.Sprintf("Reconciled Subscription Object for step: %d: %+v", i, sub)) + } + p.Status.PropagateSubscriptionStatuses(subs) + + return nil +} + +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.Pipeline) (*v1alpha1.Pipeline, error) { + p, err := r.pipelineLister.Pipelines(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + // If there's nothing to update, just return. + if reflect.DeepEqual(p.Status, desired.Status) { + return p, nil + } + + // Don't modify the informers copy. + existing := p.DeepCopy() + existing.Status = desired.Status + + return r.EventingClientSet.MessagingV1alpha1().Pipelines(desired.Namespace).UpdateStatus(existing) +} + +func (r *Reconciler) reconcileChannel(ctx context.Context, channelName string, channelResourceInterface dynamic.ResourceInterface, p *v1alpha1.Pipeline) (*unstructured.Unstructured, error) { + c, err := channelResourceInterface.Get(channelName, metav1.GetOptions{}) + if err != nil { + if apierrs.IsNotFound(err) { + newChannel, err := resources.NewChannel(channelName, p) + logging.FromContext(ctx).Error(fmt.Sprintf("Creating Channel Object: %+v", newChannel)) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to create Channel resource object: %s/%s", p.Namespace, channelName), zap.Error(err)) + return nil, err + } + created, err := channelResourceInterface.Create(newChannel, metav1.CreateOptions{}) + if err != nil { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to create Channel: %s/%s", p.Namespace, channelName), zap.Error(err)) + return nil, err + } + logging.FromContext(ctx).Info(fmt.Sprintf("Created Channel: %s/%s", p.Namespace, channelName), zap.Any("NewChannel", newChannel)) + return created, nil + } else { + logging.FromContext(ctx).Error(fmt.Sprintf("Failed to get Channel: %s/%s", p.Namespace, channelName), zap.Error(err)) + return nil, err + } + } + logging.FromContext(ctx).Debug(fmt.Sprintf("Found Channel: %s/%s", p.Namespace, channelName), zap.Any("NewChannel", c)) + return c, nil +} + +func (r *Reconciler) reconcileSubscription(ctx context.Context, step int, p *v1alpha1.Pipeline) (*eventingv1alpha1.Subscription, error) { + expected := resources.NewSubscription(step, p) + + subName := resources.PipelineSubscriptionName(p.Name, step) + sub, err := r.subscriptionLister.Subscriptions(p.Namespace).Get(subName) + + // If the resource doesn't exist, we'll create it. + if apierrs.IsNotFound(err) { + sub = expected + logging.FromContext(ctx).Info(fmt.Sprintf("Creating subscription: %+v", sub)) + newSub, err := r.EventingClientSet.EventingV1alpha1().Subscriptions(sub.Namespace).Create(sub) + if err != nil { + // TODO: Send events here, or elsewhere? + //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Pipeline's subscription failed: %v", err) + return nil, fmt.Errorf("Failed to create Subscription Object for step: %d : %s", step, err) + } + return newSub, nil + } else if err != nil { + logging.FromContext(ctx).Error("Failed to get subscription", zap.Error(err)) + // TODO: Send events here, or elsewhere? + //r.Recorder.Eventf(p, corev1.EventTypeWarning, subscriptionCreateFailed, "Create Pipelines's subscription failed: %v", err) + return nil, fmt.Errorf("Failed to get subscription: %s", err) + } + return sub, nil +} diff --git a/pkg/reconciler/pipeline/pipeline_test.go b/pkg/reconciler/pipeline/pipeline_test.go new file mode 100644 index 00000000000..fc714db2e49 --- /dev/null +++ b/pkg/reconciler/pipeline/pipeline_test.go @@ -0,0 +1,518 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pipeline + +import ( + "fmt" + "testing" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + fakeclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/pipeline/resources" + reconciletesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/knative/pkg/apis" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + . "github.com/knative/pkg/reconciler/testing" + "github.com/knative/pkg/tracker" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + clientgotesting "k8s.io/client-go/testing" +) + +const ( + testNS = "test-namespace" + pipelineName = "test-pipeline" + pipelineUID = "test-pipeline-uid" + replyChannelName = "reply-channel" +) + +func init() { + // Add types to scheme + _ = v1alpha1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) +} + +type fakeAddressableInformer struct{} + +func (*fakeAddressableInformer) TrackInNamespace(tracker.Interface, metav1.Object) func(corev1.ObjectReference) error { + return func(corev1.ObjectReference) error { return nil } +} + +func TestNewController(t *testing.T) { + kubeClient := fakekubeclientset.NewSimpleClientset() + eventingClient := fakeclientset.NewSimpleClientset() + + // Create informer factories with fake clients. The second parameter sets the + // resync period to zero, disabling it. + eventingInformerFactory := informers.NewSharedInformerFactory(eventingClient, 0) + + // Messaging + pipelineInformer := eventingInformerFactory.Messaging().V1alpha1().Pipelines() + + // Eventing + subscriptionInformer := eventingInformerFactory.Eventing().V1alpha1().Subscriptions() + + c := NewController( + reconciler.Options{ + KubeClientSet: kubeClient, + EventingClientSet: eventingClient, + Logger: logtesting.TestLogger(t), + }, + pipelineInformer, + &fakeAddressableInformer{}, + subscriptionInformer) + + if c == nil { + t.Fatalf("Failed to create with NewController") + } +} + +func createReplyChannel(channelName string) *corev1.ObjectReference { + return &corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: channelName, + } + +} + +func createChannel(pipelineName string, stepNumber int) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1alpha1", + "kind": "inmemorychannel", + "metadata": map[string]interface{}{ + "creationTimestamp": nil, + "namespace": testNS, + "name": resources.PipelineChannelName(pipelineName, stepNumber), + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "messaging.knative.dev/v1alpha1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "Pipeline", + "name": pipelineName, + "uid": "", + }, + }, + }, + "spec": map[string]interface{}{}, + }, + } + +} + +func createSubscriber(stepNumber int) eventingv1alpha1.SubscriberSpec { + uriString := fmt.Sprintf("http://example.com/%d", stepNumber) + return eventingv1alpha1.SubscriberSpec{ + URI: &uriString, + } +} + +func TestAllCases(t *testing.T) { + pKey := testNS + "/" + pipelineName + imc := v1alpha1.ChannelTemplateSpec{ + metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + }, + runtime.RawExtension{Raw: []byte("{}")}, + } + + table := TableTest{ + { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { // TODO: there is a bug in the controller, it will query for "" + // Name: "trigger key not found ", + // Objects: []runtime.Object{ + // reconciletesting.NewTrigger(triggerName, testNS), + // }, + // Key: "foo/incomplete", + // WantErr: true, + // WantEvents: []string{ + // Eventf(corev1.EventTypeWarning, "ChannelReferenceFetchFailed", "Failed to validate spec.channel exists: s \"\" not found"), + // }, + }, { + Name: "deleting", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineDeleted)}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Pipeline reconciled"), + }, + }, { + Name: "singlestep", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0)}))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Pipeline reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(pipelineName, 0), + resources.NewSubscription(0, reconciletesting.NewPipeline(pipelineName, testNS, reconciletesting.WithPipelineChannelTemplateSpec(imc), reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0)}))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0)}), + reconciletesting.WithPipelineChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithPipelineAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithPipelineSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithPipelineChannelStatuses([]v1alpha1.PipelineChannelStatus{ + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 0), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + }), + reconciletesting.WithPipelineSubscriptionStatuses([]v1alpha1.PipelineSubscriptionStatus{ + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 0), + Namespace: testNS, + }, + }, + })), + }}, + }, { + Name: "singlestepwithreply", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0)}))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Pipeline reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(pipelineName, 0), + resources.NewSubscription(0, reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0)}))), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0)}), + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithPipelineChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithPipelineSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithPipelineChannelStatuses([]v1alpha1.PipelineChannelStatus{ + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 0), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + }), + reconciletesting.WithPipelineSubscriptionStatuses([]v1alpha1.PipelineSubscriptionStatus{ + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 0), + Namespace: testNS, + }, + }, + })), + }}, + }, { + Name: "threestep", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{ + createSubscriber(0), + createSubscriber(1), + createSubscriber(2)}))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Pipeline reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(pipelineName, 0), + createChannel(pipelineName, 1), + createChannel(pipelineName, 2), + resources.NewSubscription(0, reconciletesting.NewPipeline(pipelineName, testNS, reconciletesting.WithPipelineChannelTemplateSpec(imc), reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0), createSubscriber(1), createSubscriber(2)}))), + resources.NewSubscription(1, reconciletesting.NewPipeline(pipelineName, testNS, reconciletesting.WithPipelineChannelTemplateSpec(imc), reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0), createSubscriber(1), createSubscriber(2)}))), + resources.NewSubscription(2, reconciletesting.NewPipeline(pipelineName, testNS, reconciletesting.WithPipelineChannelTemplateSpec(imc), reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0), createSubscriber(1), createSubscriber(2)})))}, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{ + createSubscriber(0), + createSubscriber(1), + createSubscriber(2), + }), + reconciletesting.WithPipelineChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithPipelineAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithPipelineSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithPipelineChannelStatuses([]v1alpha1.PipelineChannelStatus{ + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 0), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 1), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 2), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + }), + reconciletesting.WithPipelineSubscriptionStatuses([]v1alpha1.PipelineSubscriptionStatus{ + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 0), + Namespace: testNS, + }, + }, + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 1), + Namespace: testNS, + }, + }, + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 2), + Namespace: testNS, + }, + }, + })), + }}, + }, { + Name: "threestepwithreply", + Key: pKey, + Objects: []runtime.Object{ + reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{ + createSubscriber(0), + createSubscriber(1), + createSubscriber(2)}))}, + WantErr: false, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "Reconciled", "Pipeline reconciled"), + }, + WantCreates: []runtime.Object{ + createChannel(pipelineName, 0), + createChannel(pipelineName, 1), + createChannel(pipelineName, 2), + resources.NewSubscription(0, reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0), createSubscriber(1), createSubscriber(2)}))), + resources.NewSubscription(1, reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0), createSubscriber(1), createSubscriber(2)}))), + resources.NewSubscription(2, reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{createSubscriber(0), createSubscriber(1), createSubscriber(2)})))}, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewPipeline(pipelineName, testNS, + reconciletesting.WithInitPipelineConditions, + reconciletesting.WithPipelineReply(createReplyChannel(replyChannelName)), + reconciletesting.WithPipelineChannelTemplateSpec(imc), + reconciletesting.WithPipelineSteps([]eventingv1alpha1.SubscriberSpec{ + createSubscriber(0), + createSubscriber(1), + createSubscriber(2), + }), + reconciletesting.WithPipelineChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), + reconciletesting.WithPipelineAddressableNotReady("emptyHostname", "hostname is the empty string"), + reconciletesting.WithPipelineSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), + reconciletesting.WithPipelineChannelStatuses([]v1alpha1.PipelineChannelStatus{ + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 0), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 1), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + v1alpha1.PipelineChannelStatus{ + Channel: corev1.ObjectReference{ + APIVersion: "messaging.knative.dev/v1alpha1", + Kind: "inmemorychannel", + Name: resources.PipelineChannelName(pipelineName, 2), + Namespace: testNS, + }, + ReadyCondition: apis.Condition{ + Type: apis.ConditionReady, + Status: corev1.ConditionFalse, + Reason: "NotAddressable", + Message: "Channel is not addressable", + }, + }, + }), + reconciletesting.WithPipelineSubscriptionStatuses([]v1alpha1.PipelineSubscriptionStatus{ + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 0), + Namespace: testNS, + }, + }, + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 1), + Namespace: testNS, + }, + }, + v1alpha1.PipelineSubscriptionStatus{ + Subscription: corev1.ObjectReference{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + Name: resources.PipelineSubscriptionName(pipelineName, 2), + Namespace: testNS, + }, + }, + })), + }}, + }, + } + + defer logtesting.ClearAll() + + table.Test(t, reconciletesting.MakeFactory(func(listers *reconciletesting.Listers, opt reconciler.Options) controller.Reconciler { + return &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + pipelineLister: listers.GetPipelineLister(), + addressableInformer: &fakeAddressableInformer{}, + subscriptionLister: listers.GetSubscriptionLister(), + } + }, + false, + )) +} diff --git a/pkg/reconciler/pipeline/resources/channel.go b/pkg/reconciler/pipeline/resources/channel.go new file mode 100644 index 00000000000..8a62c7eac1e --- /dev/null +++ b/pkg/reconciler/pipeline/resources/channel.go @@ -0,0 +1,63 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "encoding/json" + "fmt" + + "github.com/knative/pkg/kmeta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// PipelineChannelName creates a name for the Channel fronting a specific step. +func PipelineChannelName(pipelineName string, step int) string { + return fmt.Sprintf("%s-kn-pipeline-%d", pipelineName, step) +} + +// NewChannel returns an unstructured.Unstructured based on the ChannelTemplateSpec +// for a given pipeline. +func NewChannel(name string, p *v1alpha1.Pipeline) (*unstructured.Unstructured, error) { + // Set the name of the resource we're creating as well as the namespace, etc. + template := v1alpha1.ChannelTemplateSpecInternal{ + metav1.TypeMeta{ + Kind: p.Spec.ChannelTemplate.Kind, + APIVersion: p.Spec.ChannelTemplate.APIVersion, + }, + metav1.ObjectMeta{ + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(p), + }, + Name: name, + Namespace: p.Namespace, + }, + p.Spec.ChannelTemplate.Spec, + } + raw, err := json.Marshal(template) + if err != nil { + return nil, err + } + u := &unstructured.Unstructured{} + err = json.Unmarshal(raw, u) + if err != nil { + return nil, err + } + return u, nil +} diff --git a/pkg/reconciler/pipeline/resources/subscription.go b/pkg/reconciler/pipeline/resources/subscription.go new file mode 100644 index 00000000000..f810bb58f5f --- /dev/null +++ b/pkg/reconciler/pipeline/resources/subscription.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + + "github.com/knative/pkg/kmeta" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + v1alpha1 "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func PipelineSubscriptionName(pipelineName string, step int) string { + return fmt.Sprintf("%s-kn-pipeline-%d", pipelineName, step) +} + +func NewSubscription(stepNumber int, p *v1alpha1.Pipeline) *eventingv1alpha1.Subscription { + r := &eventingv1alpha1.Subscription{ + TypeMeta: metav1.TypeMeta{ + Kind: "Subscription", + APIVersion: "eventing.knative.dev/v1alpha1", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: p.Namespace, + Name: PipelineSubscriptionName(p.Name, stepNumber), + + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(p), + }, + }, + Spec: eventingv1alpha1.SubscriptionSpec{ + Channel: corev1.ObjectReference{ + APIVersion: p.Spec.ChannelTemplate.APIVersion, + Kind: p.Spec.ChannelTemplate.Kind, + Name: PipelineChannelName(p.Name, stepNumber), + }, + Subscriber: &p.Spec.Steps[stepNumber], + }, + } + // If it's not the last step, use the next channel as the reply to, if it's the very + // last one, we'll use the (optional) reply from the Pipeline Spec. + if stepNumber < len(p.Spec.Steps)-1 { + r.Spec.Reply = &eventingv1alpha1.ReplyStrategy{ + Channel: &corev1.ObjectReference{ + APIVersion: p.Spec.ChannelTemplate.APIVersion, + Kind: p.Spec.ChannelTemplate.Kind, + Name: PipelineChannelName(p.Name, stepNumber+1), + }, + } + } else if p.Spec.Reply != nil { + r.Spec.Reply = &eventingv1alpha1.ReplyStrategy{Channel: p.Spec.Reply} + } + return r +} diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index 70a2588997b..185e80b2429 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -136,6 +136,10 @@ func (l *Listers) GetChannelLister() eventinglisters.ChannelLister { return eventinglisters.NewChannelLister(l.indexerFor(&eventingv1alpha1.Channel{})) } +func (l *Listers) GetPipelineLister() messaginglisters.PipelineLister { + return messaginglisters.NewPipelineLister(l.indexerFor(&messagingv1alpha1.Pipeline{})) +} + func (l *Listers) GetCronJobSourceLister() sourcelisters.CronJobSourceLister { return sourcelisters.NewCronJobSourceLister(l.indexerFor(&sourcesv1alpha1.CronJobSource{})) } diff --git a/pkg/reconciler/testing/pipeline.go b/pkg/reconciler/testing/pipeline.go new file mode 100644 index 00000000000..1029397c383 --- /dev/null +++ b/pkg/reconciler/testing/pipeline.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "time" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// PipelineOption enables further configuration of a Pipeline. +type PipelineOption func(*v1alpha1.Pipeline) + +// NewPipeline creates an Pipeline with PipelineOptions. +func NewPipeline(name, namespace string, popt ...PipelineOption) *v1alpha1.Pipeline { + p := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: v1alpha1.PipelineSpec{}, + } + for _, opt := range popt { + opt(p) + } + p.SetDefaults(context.Background()) + return p +} + +func WithInitPipelineConditions(p *v1alpha1.Pipeline) { + p.Status.InitializeConditions() +} + +func WithPipelineDeleted(p *v1alpha1.Pipeline) { + deleteTime := metav1.NewTime(time.Unix(1e9, 0)) + p.ObjectMeta.SetDeletionTimestamp(&deleteTime) +} + +func WithPipelineChannelTemplateSpec(cts v1alpha1.ChannelTemplateSpec) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Spec.ChannelTemplate = cts + } +} + +func WithPipelineSteps(steps []eventingv1alpha1.SubscriberSpec) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Spec.Steps = steps + } +} + +func WithPipelineReply(reply *corev1.ObjectReference) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Spec.Reply = reply + } +} + +func WithPipelineSubscriptionStatuses(subscriptionStatuses []v1alpha1.PipelineSubscriptionStatus) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Status.SubscriptionStatuses = subscriptionStatuses + } +} + +func WithPipelineChannelStatuses(channelStatuses []v1alpha1.PipelineChannelStatus) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Status.ChannelStatuses = channelStatuses + } +} + +func WithPipelineChannelsNotReady(reason, message string) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Status.MarkChannelsNotReady(reason, message) + } +} + +func WithPipelineSubscriptionsNotReady(reason, message string) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Status.MarkSubscriptionsNotReady(reason, message) + } +} + +func WithPipelineAddressableNotReady(reason, message string) PipelineOption { + return func(p *v1alpha1.Pipeline) { + p.Status.MarkAddressableNotReady(reason, message) + } +}