diff --git a/cmd/sources-controller/main.go b/cmd/sources-controller/main.go index 6abe075e348..000cbf04c4a 100644 --- a/cmd/sources-controller/main.go +++ b/cmd/sources-controller/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "github.com/knative/eventing/pkg/reconciler/containersource" "k8s.io/client-go/tools/clientcmd" "log" @@ -63,7 +64,7 @@ func main() { logger = logger.With(zap.String("controller/impl", "pkg")) logger.Info("Starting the controller") - const numControllers = 1 + const numControllers = 2 cfg.QPS = numControllers * rest.DefaultQPS cfg.Burst = numControllers * rest.DefaultBurst opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) @@ -72,7 +73,8 @@ func main() { eventingInformerFactory := informers.NewSharedInformerFactory(opt.EventingClientSet, opt.ResyncPeriod) // Eventing - cronjobsourceInformer := eventingInformerFactory.Sources().V1alpha1().CronJobSources() + cronJobSourceInformer := eventingInformerFactory.Sources().V1alpha1().CronJobSources() + containerSourceInformer := eventingInformerFactory.Sources().V1alpha1().ContainerSources() // Kube deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() @@ -83,7 +85,12 @@ func main() { controllers := []*kncontroller.Impl{ cronjobsource.NewController( opt, - cronjobsourceInformer, + cronJobSourceInformer, + deploymentInformer, + ), + containersource.NewController( + opt, + containerSourceInformer, deploymentInformer, ), } @@ -104,7 +111,8 @@ func main() { if err := kncontroller.StartInformers( stopCh, // Eventing - cronjobsourceInformer.Informer(), + cronJobSourceInformer.Informer(), + containerSourceInformer.Informer(), // Kube deploymentInformer.Informer(), ); err != nil { diff --git a/config/200-controller-clusterrole.yaml b/config/200-controller-clusterrole.yaml index 44a8d3f1408..5be03648e78 100644 --- a/config/200-controller-clusterrole.yaml +++ b/config/200-controller-clusterrole.yaml @@ -79,4 +79,7 @@ rules: - "cronjobsources" - "cronjobsources/status" - "cronjobsources/finalizers" + - "containersources" + - "containersources/status" + - "containersources/finalizers" verbs: *everything diff --git a/config/300-containersource.yaml b/config/300-containersource.yaml new file mode 100644 index 00000000000..ecf50df75ce --- /dev/null +++ b/config/300-containersource.yaml @@ -0,0 +1,89 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + labels: + eventing.knative.dev/source: "true" + knative.dev/crd-install: "true" + name: containersources.sources.eventing.knative.dev +spec: + group: sources.eventing.knative.dev + names: + categories: + - all + - knative + - eventing + - sources + kind: ContainerSource + plural: containersources + scope: Namespaced + subresources: + status: {} + validation: + openAPIV3Schema: + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + properties: + args: + items: + type: string + type: array + env: + items: + type: object + type: array + image: + minLength: 1 + type: string + serviceAccountName: + type: string + sink: + type: object + type: object + status: + properties: + conditions: + items: + properties: + lastTransitionTime: + # we use a string in the stored object but a wrapper object + # at runtime. + type: string + message: + type: string + reason: + type: string + severity: + type: string + status: + type: string + type: + type: string + required: + - type + - status + type: object + type: array + sinkUri: + type: string + type: object + version: v1alpha1 diff --git a/pkg/apis/sources/v1alpha1/containersource_lifecycle.go b/pkg/apis/sources/v1alpha1/containersource_lifecycle.go new file mode 100644 index 00000000000..5848f9511ba --- /dev/null +++ b/pkg/apis/sources/v1alpha1/containersource_lifecycle.go @@ -0,0 +1,92 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" +) + +const ( + // ContainerSourceConditionReady has status True when the ContainerSource is ready to send events. + ContainerConditionReady = duckv1alpha1.ConditionReady + + // ContainerConditionSinkProvided has status True when the ContainerSource has been configured with a sink target. + ContainerConditionSinkProvided duckv1alpha1.ConditionType = "SinkProvided" + + // ContainerConditionDeployed has status True when the ContainerSource has had it's deployment created. + ContainerConditionDeployed duckv1alpha1.ConditionType = "Deployed" +) + +var containerCondSet = duckv1alpha1.NewLivingConditionSet( + ContainerConditionSinkProvided, + ContainerConditionDeployed, +) + +// GetCondition returns the condition currently associated with the given type, or nil. +func (s *ContainerSourceStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { + return containerCondSet.Manage(s).GetCondition(t) +} + +// IsReady returns true if the resource is ready overall. +func (s *ContainerSourceStatus) IsReady() bool { + return containerCondSet.Manage(s).IsHappy() +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (s *ContainerSourceStatus) InitializeConditions() { + containerCondSet.Manage(s).InitializeConditions() +} + +// MarkSink sets the condition that the source has a sink configured. +func (s *ContainerSourceStatus) MarkSink(uri string) { + s.SinkURI = uri + if len(uri) > 0 { + containerCondSet.Manage(s).MarkTrue(ContainerConditionSinkProvided) + } else { + containerCondSet.Manage(s).MarkUnknown(ContainerConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "") + } +} + +// MarkNoSink sets the condition that the source does not have a sink configured. +func (s *ContainerSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) { + containerCondSet.Manage(s).MarkFalse(ContainerConditionSinkProvided, reason, messageFormat, messageA...) +} + +// IsDeployed returns true if the Deployed condition has status true, otherwise +// false. +func (s *ContainerSourceStatus) IsDeployed() bool { + c := containerCondSet.Manage(s).GetCondition(ContainerConditionDeployed) + if c != nil { + return c.IsTrue() + } + return false +} + +// MarkDeployed sets the condition that the source has been deployed. +func (s *ContainerSourceStatus) MarkDeployed() { + containerCondSet.Manage(s).MarkTrue(ContainerConditionDeployed) +} + +// MarkDeploying sets the condition that the source is deploying. +func (s *ContainerSourceStatus) MarkDeploying(reason, messageFormat string, messageA ...interface{}) { + containerCondSet.Manage(s).MarkUnknown(ContainerConditionDeployed, reason, messageFormat, messageA...) +} + +// MarkNotDeployed sets the condition that the source has not been deployed. +func (s *ContainerSourceStatus) MarkNotDeployed(reason, messageFormat string, messageA ...interface{}) { + containerCondSet.Manage(s).MarkFalse(ContainerConditionDeployed, reason, messageFormat, messageA...) +} diff --git a/pkg/apis/sources/v1alpha1/containersource_lifecycle_test.go b/pkg/apis/sources/v1alpha1/containersource_lifecycle_test.go new file mode 100644 index 00000000000..cde7b66f270 --- /dev/null +++ b/pkg/apis/sources/v1alpha1/containersource_lifecycle_test.go @@ -0,0 +1,448 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/google/go-cmp/cmp" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" +) + +func TestContainerSourceStatusIsReady(t *testing.T) { + tests := []struct { + name string + s *ContainerSourceStatus + want bool + }{{ + name: "uninitialized", + s: &ContainerSourceStatus{}, + want: false, + }, { + name: "initialized", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + return s + }(), + want: false, + }, { + name: "mark deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkDeployed() + return s + }(), + want: false, + }, { + name: "mark sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + return s + }(), + want: false, + }, { + name: "mark sink and deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + return s + }(), + want: true, + }, { + name: "mark sink and deployed then no sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkNoSink("Testing", "") + return s + }(), + want: false, + }, { + name: "mark sink and deployed then deploying", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkDeploying("Testing", "") + return s + }(), + want: false, + }, { + name: "mark sink and deployed then not deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkNotDeployed("Testing", "") + return s + }(), + want: false, + }, { + name: "mark sink and not deployed then deploying then deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkNotDeployed("MarkNotDeployed", "") + s.MarkDeploying("MarkDeploying", "") + s.MarkDeployed() + return s + }(), + want: true, + }, { + name: "mark sink empty and deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("") + s.MarkDeployed() + return s + }(), + want: false, + }, { + name: "mark sink empty and deployed then sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("") + s.MarkDeployed() + s.MarkSink("uri://example") + return s + }(), + want: true, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.s.IsReady() + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("%s: unexpected condition (-want, +got) = %v", test.name, diff) + } + }) + } +} + +func TestContainerSourceStatusGetCondition(t *testing.T) { + tests := []struct { + name string + s *ContainerSourceStatus + condQuery duckv1alpha1.ConditionType + want *duckv1alpha1.Condition + }{{ + name: "uninitialized", + s: &ContainerSourceStatus{}, + condQuery: ContainerConditionReady, + want: nil, + }, { + name: "initialized", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionUnknown, + }, + }, { + name: "mark deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkDeployed() + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionUnknown, + }, + }, { + name: "mark sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionUnknown, + }, + }, { + name: "mark sink and deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionTrue, + }, + }, { + name: "mark sink and deployed then no sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkNoSink("Testing", "hi%s", "") + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionFalse, + Reason: "Testing", + Message: "hi", + }, + }, { + name: "mark sink and deployed then deploying", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkDeploying("Testing", "hi%s", "") + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionUnknown, + Reason: "Testing", + Message: "hi", + }, + }, { + name: "mark sink and deployed then not deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkNotDeployed("Testing", "hi%s", "") + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionFalse, + Reason: "Testing", + Message: "hi", + }, + }, { + name: "mark sink and not deployed then deploying then deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkNotDeployed("MarkNotDeployed", "%s", "") + s.MarkDeploying("MarkDeploying", "%s", "") + s.MarkDeployed() + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionTrue, + }, + }, { + name: "mark sink empty and deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("") + s.MarkDeployed() + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionUnknown, + Reason: "SinkEmpty", + Message: "Sink has resolved to empty.", + }, + }, { + name: "mark sink empty and deployed then sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("") + s.MarkDeployed() + s.MarkSink("uri://example") + return s + }(), + condQuery: ContainerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ContainerConditionReady, + Status: corev1.ConditionTrue, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.s.GetCondition(test.condQuery) + ignoreTime := cmpopts.IgnoreFields(duckv1alpha1.Condition{}, + "LastTransitionTime", "Severity") + if diff := cmp.Diff(test.want, got, ignoreTime); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) + } +} + +func TestContainerSourceStatusIsDeployed(t *testing.T) { + tests := []struct { + name string + s *ContainerSourceStatus + want bool + }{{ + name: "uninitialized", + s: &ContainerSourceStatus{}, + want: false, + }, { + name: "initialized", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + return s + }(), + want: false, + }, { + name: "mark deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkDeployed() + return s + }(), + want: true, + }, { + name: "mark sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + return s + }(), + want: false, + }, { + name: "mark sink and deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + return s + }(), + want: true, + }, { + name: "mark sink and deployed then no sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkNoSink("Testing", "") + return s + }(), + want: true, + }, { + name: "mark sink and deployed then deploying", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkDeploying("Testing", "") + return s + }(), + want: false, + }, { + name: "mark sink and deployed then not deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + s.MarkNotDeployed("Testing", "") + return s + }(), + want: false, + }, { + name: "mark sink and not deployed then deploying then deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkNotDeployed("MarkNotDeployed", "") + s.MarkDeploying("MarkDeploying", "") + s.MarkDeployed() + return s + }(), + want: true, + }, { + name: "mark sink empty and deployed", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("") + s.MarkDeployed() + return s + }(), + want: true, + }, { + name: "mark sink empty and deployed then sink", + s: func() *ContainerSourceStatus { + s := &ContainerSourceStatus{} + s.InitializeConditions() + s.MarkSink("") + s.MarkDeployed() + s.MarkSink("uri://example") + return s + }(), + want: true, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.s.IsDeployed() + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("%s: unexpected condition (-want, +got) = %v", test.name, diff) + } + }) + } +} diff --git a/pkg/apis/sources/v1alpha1/containersource_types.go b/pkg/apis/sources/v1alpha1/containersource_types.go new file mode 100644 index 00000000000..bad9c1b2639 --- /dev/null +++ b/pkg/apis/sources/v1alpha1/containersource_types.go @@ -0,0 +1,97 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "github.com/knative/pkg/apis/duck" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +k8s:defaulter-gen=true + +// ContainerSource is the Schema for the containersources API +type ContainerSource struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec ContainerSourceSpec `json:"spec,omitempty"` + Status ContainerSourceStatus `json:"status,omitempty"` +} + +// Check that ContainerSource can be validated and can be defaulted. +var _ runtime.Object = (*ContainerSource)(nil) + +// Check that ContainerSource implements the Conditions duck type. +var _ = duck.VerifyType(&ContainerSource{}, &duckv1alpha1.Conditions{}) + +// ContainerSourceSpec defines the desired state of ContainerSource +type ContainerSourceSpec struct { + // Image is the image to run inside of the container. + // +kubebuilder:validation:MinLength=1 + Image string `json:"image,omitempty"` + + // Args are passed to the ContainerSpec as they are. + Args []string `json:"args,omitempty"` + + // Env is the list of environment variables to set in the container. + // Cannot be updated. + // +optional + // +patchMergeKey=name + // +patchStrategy=merge + Env []corev1.EnvVar `json:"env,omitempty" patchStrategy:"merge" patchMergeKey:"name"` + + // ServiceAccountName is the name of the ServiceAccount to use to run this + // source. + // +optional + ServiceAccountName string `json:"serviceAccountName,omitempty"` + + // Sink is a reference to an object that will resolve to a domain name to use as the sink. + // +optional + Sink *corev1.ObjectReference `json:"sink,omitempty"` +} + +// GetGroupVersionKind returns the GroupVersionKind. +func (s *ContainerSource) GetGroupVersionKind() schema.GroupVersionKind { + return SchemeGroupVersion.WithKind("ContainerSource") +} + +// ContainerSourceStatus defines the observed state of ContainerSource +type ContainerSourceStatus struct { + // inherits duck/v1alpha1 Status, which currently provides: + // * ObservedGeneration - the 'Generation' of the Service that was last processed by the controller. + // * Conditions - the latest available observations of a resource's current state. + duckv1alpha1.Status `json:",inline"` + + // SinkURI is the current active sink URI that has been configured for the ContainerSource. + // +optional + SinkURI string `json:"sinkUri,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ContainerSourceList contains a list of ContainerSource +type ContainerSourceList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []ContainerSource `json:"items"` +} diff --git a/pkg/apis/sources/v1alpha1/register.go b/pkg/apis/sources/v1alpha1/register.go index a170bc79c54..83556830ff3 100644 --- a/pkg/apis/sources/v1alpha1/register.go +++ b/pkg/apis/sources/v1alpha1/register.go @@ -47,6 +47,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &CronJobSource{}, &CronJobSourceList{}, + &ContainerSource{}, + &ContainerSourceList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/sources/v1alpha1/register_test.go b/pkg/apis/sources/v1alpha1/register_test.go index 2c4e17d8b5e..c111fda185c 100644 --- a/pkg/apis/sources/v1alpha1/register_test.go +++ b/pkg/apis/sources/v1alpha1/register_test.go @@ -62,6 +62,8 @@ func TestKnownTypes(t *testing.T) { for _, name := range []string{ "CronJobSource", "CronJobSourceList", + "ContainerSource", + "ContainerSourceList", } { if _, ok := types[name]; !ok { t.Errorf("Did not find %q as registered type", name) diff --git a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go index 686c603a5e3..29ee5bbe035 100644 --- a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,117 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerSource) DeepCopyInto(out *ContainerSource) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerSource. +func (in *ContainerSource) DeepCopy() *ContainerSource { + if in == nil { + return nil + } + out := new(ContainerSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ContainerSource) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerSourceList) DeepCopyInto(out *ContainerSourceList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ContainerSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerSourceList. +func (in *ContainerSourceList) DeepCopy() *ContainerSourceList { + if in == nil { + return nil + } + out := new(ContainerSourceList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ContainerSourceList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerSourceSpec) DeepCopyInto(out *ContainerSourceSpec) { + *out = *in + if in.Args != nil { + in, out := &in.Args, &out.Args + *out = make([]string, len(*in)) + copy(*out, *in) + } + if in.Env != nil { + in, out := &in.Env, &out.Env + *out = make([]v1.EnvVar, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + if in.Sink != nil { + in, out := &in.Sink, &out.Sink + *out = new(v1.ObjectReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerSourceSpec. +func (in *ContainerSourceSpec) DeepCopy() *ContainerSourceSpec { + if in == nil { + return nil + } + out := new(ContainerSourceSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ContainerSourceStatus) DeepCopyInto(out *ContainerSourceStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ContainerSourceStatus. +func (in *ContainerSourceStatus) DeepCopy() *ContainerSourceStatus { + if in == nil { + return nil + } + out := new(ContainerSourceStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CronJobSource) DeepCopyInto(out *CronJobSource) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/containersource.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/containersource.go new file mode 100644 index 00000000000..d0f935e2a85 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/containersource.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + scheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// ContainerSourcesGetter has a method to return a ContainerSourceInterface. +// A group's client should implement this interface. +type ContainerSourcesGetter interface { + ContainerSources(namespace string) ContainerSourceInterface +} + +// ContainerSourceInterface has methods to work with ContainerSource resources. +type ContainerSourceInterface interface { + Create(*v1alpha1.ContainerSource) (*v1alpha1.ContainerSource, error) + Update(*v1alpha1.ContainerSource) (*v1alpha1.ContainerSource, error) + UpdateStatus(*v1alpha1.ContainerSource) (*v1alpha1.ContainerSource, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.ContainerSource, error) + List(opts v1.ListOptions) (*v1alpha1.ContainerSourceList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ContainerSource, err error) + ContainerSourceExpansion +} + +// containerSources implements ContainerSourceInterface +type containerSources struct { + client rest.Interface + ns string +} + +// newContainerSources returns a ContainerSources +func newContainerSources(c *SourcesV1alpha1Client, namespace string) *containerSources { + return &containerSources{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the containerSource, and returns the corresponding containerSource object, and an error if there is any. +func (c *containerSources) Get(name string, options v1.GetOptions) (result *v1alpha1.ContainerSource, err error) { + result = &v1alpha1.ContainerSource{} + err = c.client.Get(). + Namespace(c.ns). + Resource("containersources"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of ContainerSources that match those selectors. +func (c *containerSources) List(opts v1.ListOptions) (result *v1alpha1.ContainerSourceList, err error) { + result = &v1alpha1.ContainerSourceList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("containersources"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested containerSources. +func (c *containerSources) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("containersources"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a containerSource and creates it. Returns the server's representation of the containerSource, and an error, if there is any. +func (c *containerSources) Create(containerSource *v1alpha1.ContainerSource) (result *v1alpha1.ContainerSource, err error) { + result = &v1alpha1.ContainerSource{} + err = c.client.Post(). + Namespace(c.ns). + Resource("containersources"). + Body(containerSource). + Do(). + Into(result) + return +} + +// Update takes the representation of a containerSource and updates it. Returns the server's representation of the containerSource, and an error, if there is any. +func (c *containerSources) Update(containerSource *v1alpha1.ContainerSource) (result *v1alpha1.ContainerSource, err error) { + result = &v1alpha1.ContainerSource{} + err = c.client.Put(). + Namespace(c.ns). + Resource("containersources"). + Name(containerSource.Name). + Body(containerSource). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *containerSources) UpdateStatus(containerSource *v1alpha1.ContainerSource) (result *v1alpha1.ContainerSource, err error) { + result = &v1alpha1.ContainerSource{} + err = c.client.Put(). + Namespace(c.ns). + Resource("containersources"). + Name(containerSource.Name). + SubResource("status"). + Body(containerSource). + Do(). + Into(result) + return +} + +// Delete takes name of the containerSource and deletes it. Returns an error if one occurs. +func (c *containerSources) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("containersources"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *containerSources) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("containersources"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched containerSource. +func (c *containerSources) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ContainerSource, err error) { + result = &v1alpha1.ContainerSource{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("containersources"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_containersource.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_containersource.go new file mode 100644 index 00000000000..18ca2d1555e --- /dev/null +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_containersource.go @@ -0,0 +1,140 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeContainerSources implements ContainerSourceInterface +type FakeContainerSources struct { + Fake *FakeSourcesV1alpha1 + ns string +} + +var containersourcesResource = schema.GroupVersionResource{Group: "sources.eventing.knative.dev", Version: "v1alpha1", Resource: "containersources"} + +var containersourcesKind = schema.GroupVersionKind{Group: "sources.eventing.knative.dev", Version: "v1alpha1", Kind: "ContainerSource"} + +// Get takes name of the containerSource, and returns the corresponding containerSource object, and an error if there is any. +func (c *FakeContainerSources) Get(name string, options v1.GetOptions) (result *v1alpha1.ContainerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(containersourcesResource, c.ns, name), &v1alpha1.ContainerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ContainerSource), err +} + +// List takes label and field selectors, and returns the list of ContainerSources that match those selectors. +func (c *FakeContainerSources) List(opts v1.ListOptions) (result *v1alpha1.ContainerSourceList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(containersourcesResource, containersourcesKind, c.ns, opts), &v1alpha1.ContainerSourceList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.ContainerSourceList{ListMeta: obj.(*v1alpha1.ContainerSourceList).ListMeta} + for _, item := range obj.(*v1alpha1.ContainerSourceList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested containerSources. +func (c *FakeContainerSources) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(containersourcesResource, c.ns, opts)) + +} + +// Create takes the representation of a containerSource and creates it. Returns the server's representation of the containerSource, and an error, if there is any. +func (c *FakeContainerSources) Create(containerSource *v1alpha1.ContainerSource) (result *v1alpha1.ContainerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(containersourcesResource, c.ns, containerSource), &v1alpha1.ContainerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ContainerSource), err +} + +// Update takes the representation of a containerSource and updates it. Returns the server's representation of the containerSource, and an error, if there is any. +func (c *FakeContainerSources) Update(containerSource *v1alpha1.ContainerSource) (result *v1alpha1.ContainerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(containersourcesResource, c.ns, containerSource), &v1alpha1.ContainerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ContainerSource), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeContainerSources) UpdateStatus(containerSource *v1alpha1.ContainerSource) (*v1alpha1.ContainerSource, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(containersourcesResource, "status", c.ns, containerSource), &v1alpha1.ContainerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ContainerSource), err +} + +// Delete takes name of the containerSource and deletes it. Returns an error if one occurs. +func (c *FakeContainerSources) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(containersourcesResource, c.ns, name), &v1alpha1.ContainerSource{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeContainerSources) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(containersourcesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.ContainerSourceList{}) + return err +} + +// Patch applies the patch and returns the patched containerSource. +func (c *FakeContainerSources) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ContainerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(containersourcesResource, c.ns, name, data, subresources...), &v1alpha1.ContainerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ContainerSource), err +} diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go index c2d9faeb94a..2742a8e7195 100644 --- a/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go @@ -28,6 +28,10 @@ type FakeSourcesV1alpha1 struct { *testing.Fake } +func (c *FakeSourcesV1alpha1) ContainerSources(namespace string) v1alpha1.ContainerSourceInterface { + return &FakeContainerSources{c, namespace} +} + func (c *FakeSourcesV1alpha1) CronJobSources(namespace string) v1alpha1.CronJobSourceInterface { return &FakeCronJobSources{c, namespace} } diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go index e3e7bf27492..b250cd0c5e3 100644 --- a/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go @@ -18,4 +18,6 @@ limitations under the License. package v1alpha1 +type ContainerSourceExpansion interface{} + type CronJobSourceExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go index 31733ad3ea4..dbbdbdf0392 100644 --- a/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go @@ -27,6 +27,7 @@ import ( type SourcesV1alpha1Interface interface { RESTClient() rest.Interface + ContainerSourcesGetter CronJobSourcesGetter } @@ -35,6 +36,10 @@ type SourcesV1alpha1Client struct { restClient rest.Interface } +func (c *SourcesV1alpha1Client) ContainerSources(namespace string) ContainerSourceInterface { + return newContainerSources(c, namespace) +} + func (c *SourcesV1alpha1Client) CronJobSources(namespace string) CronJobSourceInterface { return newCronJobSources(c, namespace) } diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index 013b0b1e9fc..48066085062 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -66,6 +66,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Eventing().V1alpha1().Triggers().Informer()}, nil // Group=sources.eventing.knative.dev, Version=v1alpha1 + case sourcesv1alpha1.SchemeGroupVersion.WithResource("containersources"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Sources().V1alpha1().ContainerSources().Informer()}, nil case sourcesv1alpha1.SchemeGroupVersion.WithResource("cronjobsources"): return &genericInformer{resource: resource.GroupResource(), informer: f.Sources().V1alpha1().CronJobSources().Informer()}, nil diff --git a/pkg/client/informers/externalversions/sources/v1alpha1/containersource.go b/pkg/client/informers/externalversions/sources/v1alpha1/containersource.go new file mode 100644 index 00000000000..e5b9f70653b --- /dev/null +++ b/pkg/client/informers/externalversions/sources/v1alpha1/containersource.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + versioned "github.com/knative/eventing/pkg/client/clientset/versioned" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/knative/eventing/pkg/client/listers/sources/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// ContainerSourceInformer provides access to a shared informer and lister for +// ContainerSources. +type ContainerSourceInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.ContainerSourceLister +} + +type containerSourceInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewContainerSourceInformer constructs a new informer for ContainerSource type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewContainerSourceInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredContainerSourceInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredContainerSourceInformer constructs a new informer for ContainerSource type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredContainerSourceInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SourcesV1alpha1().ContainerSources(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SourcesV1alpha1().ContainerSources(namespace).Watch(options) + }, + }, + &sourcesv1alpha1.ContainerSource{}, + resyncPeriod, + indexers, + ) +} + +func (f *containerSourceInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredContainerSourceInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *containerSourceInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&sourcesv1alpha1.ContainerSource{}, f.defaultInformer) +} + +func (f *containerSourceInformer) Lister() v1alpha1.ContainerSourceLister { + return v1alpha1.NewContainerSourceLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/sources/v1alpha1/interface.go b/pkg/client/informers/externalversions/sources/v1alpha1/interface.go index 244340025a8..9647603c828 100644 --- a/pkg/client/informers/externalversions/sources/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/sources/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { + // ContainerSources returns a ContainerSourceInformer. + ContainerSources() ContainerSourceInformer // CronJobSources returns a CronJobSourceInformer. CronJobSources() CronJobSourceInformer } @@ -39,6 +41,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} } +// ContainerSources returns a ContainerSourceInformer. +func (v *version) ContainerSources() ContainerSourceInformer { + return &containerSourceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // CronJobSources returns a CronJobSourceInformer. func (v *version) CronJobSources() CronJobSourceInformer { return &cronJobSourceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/listers/sources/v1alpha1/containersource.go b/pkg/client/listers/sources/v1alpha1/containersource.go new file mode 100644 index 00000000000..de44136ec26 --- /dev/null +++ b/pkg/client/listers/sources/v1alpha1/containersource.go @@ -0,0 +1,94 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// ContainerSourceLister helps list ContainerSources. +type ContainerSourceLister interface { + // List lists all ContainerSources in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.ContainerSource, err error) + // ContainerSources returns an object that can list and get ContainerSources. + ContainerSources(namespace string) ContainerSourceNamespaceLister + ContainerSourceListerExpansion +} + +// containerSourceLister implements the ContainerSourceLister interface. +type containerSourceLister struct { + indexer cache.Indexer +} + +// NewContainerSourceLister returns a new ContainerSourceLister. +func NewContainerSourceLister(indexer cache.Indexer) ContainerSourceLister { + return &containerSourceLister{indexer: indexer} +} + +// List lists all ContainerSources in the indexer. +func (s *containerSourceLister) List(selector labels.Selector) (ret []*v1alpha1.ContainerSource, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.ContainerSource)) + }) + return ret, err +} + +// ContainerSources returns an object that can list and get ContainerSources. +func (s *containerSourceLister) ContainerSources(namespace string) ContainerSourceNamespaceLister { + return containerSourceNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// ContainerSourceNamespaceLister helps list and get ContainerSources. +type ContainerSourceNamespaceLister interface { + // List lists all ContainerSources in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.ContainerSource, err error) + // Get retrieves the ContainerSource from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.ContainerSource, error) + ContainerSourceNamespaceListerExpansion +} + +// containerSourceNamespaceLister implements the ContainerSourceNamespaceLister +// interface. +type containerSourceNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all ContainerSources in the indexer for a given namespace. +func (s containerSourceNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.ContainerSource, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.ContainerSource)) + }) + return ret, err +} + +// Get retrieves the ContainerSource from the indexer for a given namespace and name. +func (s containerSourceNamespaceLister) Get(name string) (*v1alpha1.ContainerSource, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("containersource"), name) + } + return obj.(*v1alpha1.ContainerSource), nil +} diff --git a/pkg/client/listers/sources/v1alpha1/expansion_generated.go b/pkg/client/listers/sources/v1alpha1/expansion_generated.go index 444c53f7032..49fe2bab0ae 100644 --- a/pkg/client/listers/sources/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/sources/v1alpha1/expansion_generated.go @@ -18,6 +18,14 @@ limitations under the License. package v1alpha1 +// ContainerSourceListerExpansion allows custom methods to be added to +// ContainerSourceLister. +type ContainerSourceListerExpansion interface{} + +// ContainerSourceNamespaceListerExpansion allows custom methods to be added to +// ContainerSourceNamespaceLister. +type ContainerSourceNamespaceListerExpansion interface{} + // CronJobSourceListerExpansion allows custom methods to be added to // CronJobSourceLister. type CronJobSourceListerExpansion interface{} diff --git a/pkg/reconciler/containersource/containersource.go b/pkg/reconciler/containersource/containersource.go new file mode 100644 index 00000000000..9535349ee7a --- /dev/null +++ b/pkg/reconciler/containersource/containersource.go @@ -0,0 +1,316 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package containersource + +import ( + "context" + "fmt" + "reflect" + "strings" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + appsv1informers "k8s.io/client-go/informers/apps/v1" + appsv1listers "k8s.io/client-go/listers/apps/v1" + "k8s.io/client-go/tools/cache" + + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + sourceinformers "github.com/knative/eventing/pkg/client/informers/externalversions/sources/v1alpha1" + listers "github.com/knative/eventing/pkg/client/listers/sources/v1alpha1" + "github.com/knative/eventing/pkg/duck" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/containersource/resources" + "github.com/knative/pkg/controller" + "github.com/knative/pkg/logging" + "go.uber.org/zap" +) + +const ( + // ReconcilerName is the name of the reconciler + ReconcilerName = "ContainerSources" + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "container-source-controller" + + // Name of the corev1.Events emitted from the reconciliation process + sourceReconciled = "ContainerSourceReconciled" + sourceUpdateStatusFailed = "ContainerSourceUpdateStatusFailed" +) + +type Reconciler struct { + *reconciler.Base + + // listers index properties about resources + containerSourceLister listers.ContainerSourceLister + deploymentLister appsv1listers.DeploymentLister +} + +// Check that our Reconciler implements controller.Reconciler +var _ controller.Reconciler = (*Reconciler)(nil) + +// NewController initializes the controller and is called by the generated code +// Registers event handlers to enqueue events +func NewController( + opt reconciler.Options, + containerSourceInformer sourceinformers.ContainerSourceInformer, + deploymentInformer appsv1informers.DeploymentInformer, +) *controller.Impl { + r := &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + containerSourceLister: containerSourceInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), + } + impl := controller.NewImpl(r, r.Logger, ReconcilerName, reconciler.MustNewStatsReporter(ReconcilerName, r.Logger)) + + r.Logger.Info("Setting up event handlers") + containerSourceInformer.Informer().AddEventHandler(reconciler.Handler(impl.Enqueue)) + + deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("ContainerSource")), + Handler: reconciler.Handler(impl.EnqueueControllerOf), + }) + + return impl +} + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the CronJobSource +// resource with the current status of the resource. +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + r.Logger.Errorf("invalid resource key: %s", key) + return nil + } + + // Get the CronJobSource resource with this namespace/name + original, err := r.containerSourceLister.ContainerSources(namespace).Get(name) + if apierrors.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("ContainerSource key in work queue no longer exists", zap.Any("key", key)) + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy + source := original.DeepCopy() + + // Reconcile this copy of the ContainerSource and then write back any status + // updates regardless of whether the reconcile error out. + err = r.reconcile(ctx, source) + if err != nil { + logging.FromContext(ctx).Warn("Error reconciling ContainerSource", zap.Error(err)) + } else { + logging.FromContext(ctx).Debug("ContainerSource reconciled") + r.Recorder.Eventf(source, corev1.EventTypeNormal, sourceReconciled, `ContainerSource reconciled: "%s/%s"`, source.Namespace, source.Name) + } + + if _, updateStatusErr := r.updateStatus(ctx, source.DeepCopy()); updateStatusErr != nil { + logging.FromContext(ctx).Warn("Failed to update the ContainerSource", zap.Error(err)) + r.Recorder.Eventf(source, corev1.EventTypeWarning, sourceUpdateStatusFailed, "Failed to update ContainerSource's status: %v", err) + return updateStatusErr + } + + // Requeue if the resource is not ready: + return err +} + +func (r *Reconciler) reconcile(ctx context.Context, source *v1alpha1.ContainerSource) error { + // No need to reconcile if the source has been marked for deletion. + if source.DeletionTimestamp != nil { + return nil + } + + source.Status.InitializeConditions() + + annotations := make(map[string]string) + // Then wire through any annotations / labels from the Source + if source.ObjectMeta.Annotations != nil { + for k, v := range source.ObjectMeta.Annotations { + annotations[k] = v + } + } + labels := make(map[string]string) + if source.ObjectMeta.Labels != nil { + for k, v := range source.ObjectMeta.Labels { + labels[k] = v + } + } + + args := resources.ContainerArguments{ + Source: source, + Name: source.Name, + Namespace: source.Namespace, + Image: source.Spec.Image, + Args: source.Spec.Args, + Env: source.Spec.Env, + ServiceAccountName: source.Spec.ServiceAccountName, + Annotations: annotations, + Labels: labels, + } + + err := r.setSinkURIArg(ctx, source, &args) + if err != nil { + r.Recorder.Eventf(source, corev1.EventTypeWarning, "SetSinkURIFailed", "Failed to set Sink URI: %v", err) + return err + } + + deploy, err := r.getDeployment(ctx, source) + if err != nil { + if errors.IsNotFound(err) { + deploy, err = r.createDeployment(ctx, source, nil, args) + if err != nil { + r.markNotDeployedRecordEvent(source, corev1.EventTypeWarning, "DeploymentCreateFailed", "Could not create deployment: %v", err) + return err + } + r.markDeployingAndRecordEvent(source, corev1.EventTypeNormal, "DeploymentCreated", "Created deployment %q", deploy.Name) + // Since the Deployment has just been created, there's nothing more + // to do until it gets a status. This ContainerSource will be reconciled + // again when the Deployment is updated. + return nil + } + // Something unexpected happened getting the deployment. + r.markDeployingAndRecordEvent(source, corev1.EventTypeWarning, "DeploymentGetFailed", "Error getting deployment: %v", err) + return err + } + + // Update Deployment spec if it's changed + expected := resources.MakeDeployment(args) + // Since the Deployment spec has fields defaulted by the webhook, it won't + // be equal to expected. Use DeepDerivative to compare only the fields that + // are set in expected. + if !equality.Semantic.DeepDerivative(expected.Spec, deploy.Spec) { + deploy.Spec = expected.Spec + deploy, err := r.KubeClientSet.AppsV1().Deployments(deploy.Namespace).Update(deploy) + if err != nil { + r.markDeployingAndRecordEvent(source, corev1.EventTypeWarning, "DeploymentUpdateFailed", "Failed to update deployment %q: %v", deploy.Name, err) + } else { + r.markDeployingAndRecordEvent(source, corev1.EventTypeNormal, "DeploymentUpdated", "Updated deployment %q", deploy.Name) + } + // Return after this update or error and reconcile again + return err + } + + // Update source status + if deploy.Status.ReadyReplicas > 0 && !source.Status.IsDeployed() { + source.Status.MarkDeployed() + r.Recorder.Eventf(source, corev1.EventTypeNormal, "DeploymentReady", "Deployment %q has %d ready replicas", deploy.Name, deploy.Status.ReadyReplicas) + } + + return nil +} + +// setSinkURIArg attempts to get the sink URI from the sink reference and +// set it in the source status. On failure, the source's Sink condition is +// updated to reflect the error. +// If an error is returned from this function, the caller should also record +// an Event containing the error string. +func (r *Reconciler) setSinkURIArg(ctx context.Context, source *v1alpha1.ContainerSource, args *resources.ContainerArguments) error { + if uri, ok := sinkArg(source); ok { + args.SinkInArgs = true + source.Status.MarkSink(uri) + return nil + } + + if source.Spec.Sink == nil { + source.Status.MarkNoSink("Missing", "Sink missing from spec") + return fmt.Errorf("Sink missing from spec") + } + + uri, err := duck.GetSinkURI(ctx, r.DynamicClientSet, source.Spec.Sink, source.Namespace) + if err != nil { + source.Status.MarkNoSink("NotFound", `Couldn't get Sink URI from "%s/%s": %v"`, source.Spec.Sink.Namespace, source.Spec.Sink.Name, err) + return err + } + source.Status.MarkSink(uri) + args.Sink = uri + + return nil +} + +func sinkArg(source *v1alpha1.ContainerSource) (string, bool) { + for _, a := range source.Spec.Args { + if strings.HasPrefix(a, "--sink=") { + return strings.Replace(a, "--sink=", "", -1), true + } + } + return "", false +} + +func (r *Reconciler) getDeployment(ctx context.Context, source *v1alpha1.ContainerSource) (*appsv1.Deployment, error) { + dl, err := r.KubeClientSet.AppsV1().Deployments(source.Namespace).List(metav1.ListOptions{}) + if err != nil { + r.Logger.Errorf("Unable to list deployments: %v", zap.Error(err)) + return nil, err + } + for _, c := range dl.Items { + if metav1.IsControlledBy(&c, source) { + return &c, nil + } + } + return nil, errors.NewNotFound(schema.GroupResource{}, "") +} + +func (r *Reconciler) createDeployment(ctx context.Context, source *v1alpha1.ContainerSource, org *appsv1.Deployment, args resources.ContainerArguments) (*appsv1.Deployment, error) { + deployment := resources.MakeDeployment(args) + return r.KubeClientSet.AppsV1().Deployments(source.Namespace).Create(deployment) +} + +func (r *Reconciler) markDeployingAndRecordEvent(source *v1alpha1.ContainerSource, evType string, reason string, messageFmt string, args ...interface{}) { + r.Recorder.Eventf(source, evType, reason, messageFmt, args...) + source.Status.MarkDeploying(reason, messageFmt, args...) +} + +func (r *Reconciler) markNotDeployedRecordEvent(source *v1alpha1.ContainerSource, evType string, reason string, messageFmt string, args ...interface{}) { + r.Recorder.Eventf(source, evType, reason, messageFmt, args...) + source.Status.MarkNotDeployed(reason, messageFmt, args...) +} + +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.ContainerSource) (*v1alpha1.ContainerSource, error) { + source, err := r.containerSourceLister.ContainerSources(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + // If there's nothing to update, just return. + if reflect.DeepEqual(source.Status, desired.Status) { + return source, nil + } + + becomesReady := desired.Status.IsReady() && !source.Status.IsReady() + + // Don't modify the informers copy. + existing := source.DeepCopy() + existing.Status = desired.Status + + cj, err := r.EventingClientSet.SourcesV1alpha1().ContainerSources(desired.Namespace).UpdateStatus(existing) + if err == nil && becomesReady { + duration := time.Since(cj.ObjectMeta.CreationTimestamp.Time) + r.Logger.Infof("ContainerSource %q became ready after %v", source.Name, duration) + //r.StatsReporter.ReportServiceReady(subscription.Namespace, subscription.Name, duration) // TODO: stats + } + + return cj, err +} diff --git a/pkg/reconciler/containersource/containersource_test.go b/pkg/reconciler/containersource/containersource_test.go new file mode 100644 index 00000000000..eb5a9d8b02b --- /dev/null +++ b/pkg/reconciler/containersource/containersource_test.go @@ -0,0 +1,509 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Veroute.on 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package containersource + +import ( + "fmt" + "testing" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubeinformers "k8s.io/client-go/informers" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + clientgotesting "k8s.io/client-go/testing" + + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + fakeclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/utils" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + + . "github.com/knative/eventing/pkg/reconciler/testing" + . "github.com/knative/pkg/reconciler/testing" +) + +const ( + image = "github.com/knative/test/image" + sourceName = "test-container-source" + sourceUID = "1234-5678-90" + testNS = "testnamespace" + sinkName = "testsink" +) + +var ( + trueVal = true + + sinkRef = corev1.ObjectReference{ + Name: sinkName, + Kind: "Channel", + APIVersion: "eventing.knative.dev/v1alpha1", + } + nonsinkRef = corev1.ObjectReference{ + Name: sinkName, + Kind: "Trigger", + APIVersion: "eventing.knative.dev/v1alpha1", + } + sinkDNS = "sink.mynamespace.svc." + utils.GetClusterDomainName() + sinkURI = "http://" + sinkDNS + "/" + + // TODO: k8s service does not work, fix. + //serviceRef = corev1.ObjectReference{ + // Name: sinkName, + // Kind: "Service", + // APIVersion: "v1", + //} + //serviceURI = "http://service.sink.svc.cluster.local/" +) + +func init() { + // Add types to scheme + _ = appsv1.AddToScheme(scheme.Scheme) + _ = corev1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) +} + +func TestNew(t *testing.T) { + defer logtesting.ClearAll() + kubeClient := fakekubeclientset.NewSimpleClientset() + eventingClient := fakeclientset.NewSimpleClientset() + eventingInformer := informers.NewSharedInformerFactory(eventingClient, 0) + kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + + containerSourceInformer := eventingInformer.Sources().V1alpha1().ContainerSources() + deploymentInformer := kubeInformer.Apps().V1().Deployments() + + c := NewController(reconciler.Options{ + KubeClientSet: kubeClient, + EventingClientSet: eventingClient, + Logger: logtesting.TestLogger(t), + }, + containerSourceInformer, + deploymentInformer, + ) + + if c == nil { + t.Fatal("Expected NewController to return a non-nil value") + } +} + +func TestAllCases(t *testing.T) { + table := TableTest{ + { + Name: "bad workqueue key", + // Make sure Reconcile handles bad keys. + Key: "too/many/parts", + }, { + Name: "key not found", + // Make sure Reconcile handles good keys that don't exist. + Key: "foo/not-found", + }, { + Name: "missing sink", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + ), + }, + Key: testNS + "/" + sourceName, + WantErr: true, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "SetSinkURIFailed", `Failed to set Sink URI: channels.eventing.knative.dev "testsink" not found`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + // Status Update: + WithInitContainerSourceConditions, + WithContainerSourceSinkNotFound(`Couldn't get Sink URI from "/testsink": channels.eventing.knative.dev "testsink" not found"`), + ), + }}, + }, { + Name: "sink not addressable", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &nonsinkRef, + }), + ), + NewTrigger(sinkName, testNS, ""), + }, + Key: testNS + "/" + sourceName, + WantErr: true, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "SetSinkURIFailed", `Failed to set Sink URI: sink "testnamespace/testsink" (eventing.knative.dev/v1alpha1, Kind=Trigger) does not contain address`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &nonsinkRef, + }), + // Status Update: + WithInitContainerSourceConditions, + WithContainerSourceSinkNotFound(`Couldn't get Sink URI from "/testsink": sink "testnamespace/testsink" (eventing.knative.dev/v1alpha1, Kind=Trigger) does not contain address"`), + ), + }}, + }, { + Name: "sink not ready", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + ), + NewChannel(sinkName, testNS), + }, + Key: testNS + "/" + sourceName, + WantErr: true, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "SetSinkURIFailed", `Failed to set Sink URI: sink "testnamespace/testsink" (eventing.knative.dev/v1alpha1, Kind=Channel) contains an empty hostname`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + // Status Update: + WithInitContainerSourceConditions, + WithContainerSourceSinkNotFound(`Couldn't get Sink URI from "/testsink": sink "testnamespace/testsink" (eventing.knative.dev/v1alpha1, Kind=Channel) contains an empty hostname"`), + ), + }}, + }, { + Name: "sink is nil", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + }), + ), + }, + Key: testNS + "/" + sourceName, + WantErr: true, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "SetSinkURIFailed", `Failed to set Sink URI: Sink missing from spec`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + }), + // Status Update: + WithInitContainerSourceConditions, + WithContainerSourceSinkMissing("Sink missing from spec"), + ), + }}, + }, { + Name: "valid first pass", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + ), + NewChannel(sinkName, testNS, + WithChannelAddress(sinkDNS), + ), + }, + Key: testNS + "/" + sourceName, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment ""`), // TODO on noes + Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + // Status Update: + WithInitContainerSourceConditions, + WithContainerSourceSink(sinkURI), + WithContainerSourceDeploying(`Created deployment ""`), + ), + }}, + WantCreates: []metav1.Object{ + makeDeployment(NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + }), + WithContainerSourceUID(sourceUID), + ), 0, nil, nil), + }, + }, { + Name: "valid, with ready deployment", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + WithInitContainerSourceConditions, + WithContainerSourceSink(sinkURI), + WithContainerSourceDeploying(`Created deployment ""`), + ), + NewChannel(sinkName, testNS, + WithChannelAddress(sinkDNS), + ), + makeDeployment(NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + }), + WithContainerSourceUID(sourceUID), + ), 1, nil, nil), + }, + Key: testNS + "/" + sourceName, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "DeploymentReady", `Deployment "" has 1 ready replicas`), + Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + WithInitContainerSourceConditions, + WithContainerSourceSink(sinkURI), + // Status Update: + WithContainerSourceDeployed, + ), + }}, + }, { + Name: "valid first pass, with annotations and labels", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + WithContainerSourceLabels(map[string]string{"label": "labeled"}), + WithContainerSourceAnnotations(map[string]string{"annotation": "annotated"}), + ), + NewChannel(sinkName, testNS, + WithChannelAddress(sinkDNS), + ), + }, + Key: testNS + "/" + sourceName, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "DeploymentCreated", `Created deployment ""`), // TODO on noes + Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + WithContainerSourceLabels(map[string]string{"label": "labeled"}), + WithContainerSourceAnnotations(map[string]string{"annotation": "annotated"}), + // Status Update: + WithInitContainerSourceConditions, + WithContainerSourceSink(sinkURI), + WithContainerSourceDeploying(`Created deployment ""`), + ), + }}, + WantCreates: []metav1.Object{ + makeDeployment(NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + }), + WithContainerSourceUID(sourceUID), + ), 0, map[string]string{"label": "labeled"}, map[string]string{"annotation": "annotated"}), + }, + }, { + Name: "error for create deployment", + Objects: []runtime.Object{ + NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + ), + NewChannel(sinkName, testNS, + WithChannelAddress(sinkDNS), + ), + }, + Key: testNS + "/" + sourceName, + WantErr: true, + WantEvents: []string{ + Eventf(corev1.EventTypeWarning, "DeploymentCreateFailed", "Could not create deployment: inducing failure for create deployments"), + }, + WithReactors: []clientgotesting.ReactionFunc{ + InduceFailure("create", "deployments"), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + Sink: &sinkRef, + }), + WithContainerSourceUID(sourceUID), + // Status Update: + WithInitContainerSourceConditions, + WithContainerSourceSink(sinkURI), + WithContainerSourceDeployFailed(`Could not create deployment: inducing failure for create deployments`), + ), + }}, + WantCreates: []metav1.Object{ + makeDeployment(NewContainerSource(sourceName, testNS, + WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + Image: image, + }), + WithContainerSourceUID(sourceUID), + ), 0, nil, nil), + }, + }, + //{ // TODO: k8s service does not work, fix. + // Name: "valid, with sink as service", + // Objects: []runtime.Object{ + // NewContainerSource(sourceName, testNS, + // WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + // Image: image, + // Sink: &serviceRef, + // }), + // WithContainerSourceUID(sourceUID), + // ), + // NewService(sinkName, testNS), + // }, + // Key: testNS + "/" + sourceName, + // WantEvents: []string{ + // Eventf(corev1.EventTypeNormal, "ContainerSourceReconciled", `ContainerSource reconciled: "testnamespace/test-container-source"`), + // }, + // WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + // Object: NewContainerSource(sourceName, testNS, + // WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + // Image: image, + // Sink: &serviceRef, + // }), + // WithContainerSourceUID(sourceUID), + // // Status Update: + // WithInitContainerSourceConditions, + // WithContainerSourceSink(serviceURI), + // WithContainerSourceDeploying(`Created deployment ""`), + // ), + // }}, + // WantCreates: []metav1.Object{ + // makeDeployment(NewContainerSource(sourceName, testNS, + // WithContainerSourceSpec(sourcesv1alpha1.ContainerSourceSpec{ + // Image: image, + // }), + // WithContainerSourceUID(sourceUID), + // ), 0), + // }, + //}, + } + + defer logtesting.ClearAll() + table.Test(t, MakeFactory(func(listers *Listers, opt reconciler.Options) controller.Reconciler { + return &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + containerSourceLister: listers.GetContainerSourceLister(), + deploymentLister: listers.GetDeploymentLister(), + } + })) + +} + +func makeDeployment(source *sourcesv1alpha1.ContainerSource, replicas int32, labels map[string]string, annotations map[string]string) *appsv1.Deployment { + args := append(source.Spec.Args, fmt.Sprintf("--sink=%s", sinkURI)) + env := append(source.Spec.Env, corev1.EnvVar{Name: "SINK", Value: sinkURI}) + + annos := map[string]string{ + "sidecar.istio.io/inject": "true", + } + for k, v := range annotations { + annos[k] = v + } + + labs := map[string]string{ + "eventing.knative.dev/source": source.Name, + } + for k, v := range labels { + labs[k] = v + } + + return &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: fmt.Sprintf("%s-", source.Name), + Namespace: source.Namespace, + OwnerReferences: getOwnerReferences(), + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "eventing.knative.dev/source": source.Name, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: annos, + Labels: labs, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: "source", + Image: source.Spec.Image, + Args: args, + Env: env, + ImagePullPolicy: corev1.PullIfNotPresent, + }}, + ServiceAccountName: source.Spec.ServiceAccountName, + }, + }, + }, + Status: appsv1.DeploymentStatus{ + ReadyReplicas: replicas, + }, + } +} + +func getOwnerReferences() []metav1.OwnerReference { + return []metav1.OwnerReference{{ + APIVersion: sourcesv1alpha1.SchemeGroupVersion.String(), + Kind: "ContainerSource", + Name: sourceName, + Controller: &trueVal, + BlockOwnerDeletion: &trueVal, + UID: sourceUID, + }} +} diff --git a/pkg/reconciler/containersource/doc.go b/pkg/reconciler/containersource/doc.go new file mode 100644 index 00000000000..fa81f4b7843 --- /dev/null +++ b/pkg/reconciler/containersource/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package containersource implements the ContainerSource controller. +package containersource diff --git a/pkg/reconciler/containersource/resources/arguments.go b/pkg/reconciler/containersource/resources/arguments.go new file mode 100644 index 00000000000..37fc1aef7ef --- /dev/null +++ b/pkg/reconciler/containersource/resources/arguments.go @@ -0,0 +1,36 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + corev1 "k8s.io/api/core/v1" +) + +type ContainerArguments struct { + Source *v1alpha1.ContainerSource + Name string + Namespace string + Image string + Args []string + Env []corev1.EnvVar + ServiceAccountName string + SinkInArgs bool + Sink string + Annotations map[string]string + Labels map[string]string +} diff --git a/pkg/reconciler/containersource/resources/deployment.go b/pkg/reconciler/containersource/resources/deployment.go new file mode 100644 index 00000000000..5664a54dcfb --- /dev/null +++ b/pkg/reconciler/containersource/resources/deployment.go @@ -0,0 +1,117 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + "github.com/knative/pkg/kmeta" + "strings" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const sourceLabelKey = "eventing.knative.dev/source" + +func MakeDeployment(args ContainerArguments) *appsv1.Deployment { + + containerArgs := args.Args + + // if sink is already in the provided args.Args, don't attempt to add + if !args.SinkInArgs { + remote := fmt.Sprintf("--sink=%s", args.Sink) + containerArgs = append(containerArgs, remote) + } + + env := append(args.Env, corev1.EnvVar{Name: "SINK", Value: sinkArg(args)}) + + deploy := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: args.Name + "-", + Namespace: args.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *kmeta.NewControllerRef(args.Source), + }, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + sourceLabelKey: args.Name, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "sidecar.istio.io/inject": "true", + }, + Labels: map[string]string{ + sourceLabelKey: args.Name, + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: args.ServiceAccountName, + Containers: []corev1.Container{ + { + Name: "source", + Image: args.Image, + Args: containerArgs, + Env: env, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + // Then wire through any annotations from the source. Not a bug by allowing + // the container to override Istio injection. + if args.Annotations != nil { + for k, v := range args.Annotations { + deploy.Spec.Template.ObjectMeta.Annotations[k] = v + } + } + + // Then wire through any labels from the source. Do not allow to override + // our source name. This seems like it would be way errorprone by allowing + // the matchlabels then to not match, or we'd have to force them to match, etc. + // just don't allow it. + if args.Labels != nil { + for k, v := range args.Labels { + if k != sourceLabelKey { + deploy.Spec.Template.ObjectMeta.Labels[k] = v + } + } + } + return deploy +} + +func sinkArg(args ContainerArguments) string { + if args.SinkInArgs { + for _, a := range args.Args { + if strings.HasPrefix(a, "--sink=") { + return strings.Replace(a, "--sink=", "", -1) + } + } + } + return args.Sink +} diff --git a/pkg/reconciler/containersource/resources/deployment_test.go b/pkg/reconciler/containersource/resources/deployment_test.go new file mode 100644 index 00000000000..0ac4a82d2fe --- /dev/null +++ b/pkg/reconciler/containersource/resources/deployment_test.go @@ -0,0 +1,329 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + "testing" + + "github.com/google/go-cmp/cmp" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestMakeDeployment_sinkoverrideannotationlabelnotallowed(t *testing.T) { + yes := true + got := MakeDeployment(ContainerArguments{ + Source: &v1alpha1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{Name: "test-name", UID: "TEST_UID"}, + }, + Name: "test-name", + Namespace: "test-namespace", + Image: "test-image", + Args: []string{"--test1=args1", "--test2=args2"}, + Env: []corev1.EnvVar{{ + Name: "test1", + Value: "arg1", + }, { + Name: "test2", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: "test2-secret", + }, + }, + }}, + ServiceAccountName: "test-service-account", + SinkInArgs: false, + Sink: "test-sink", + Labels: map[string]string{ + "eventing.knative.dev/source": "not-allowed", + "anotherlabel": "extra-label", + }, + Annotations: map[string]string{ + "sidecar.istio.io/inject": "false", + "anotherannotation": "extra-annotation", + }, + }) + + want := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-name-", + Namespace: "test-namespace", + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "sources.eventing.knative.dev/v1alpha1", + Kind: "ContainerSource", + Name: "test-name", + UID: "TEST_UID", + Controller: &yes, + BlockOwnerDeletion: &yes, + }}, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "eventing.knative.dev/source": "test-name", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "sidecar.istio.io/inject": "false", + "anotherannotation": "extra-annotation", + }, + Labels: map[string]string{ + "eventing.knative.dev/source": "test-name", + "anotherlabel": "extra-label", + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: "test-service-account", + Containers: []corev1.Container{ + { + Name: "source", + Image: "test-image", + Args: []string{ + "--test1=args1", + "--test2=args2", + "--sink=test-sink", + }, + Env: []corev1.EnvVar{ + { + Name: "test1", + Value: "arg1", + }, { + Name: "test2", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: "test2-secret", + }, + }, + }, { + Name: "SINK", + Value: "test-sink", + }}, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected deploy (-want, +got) = %v", diff) + } +} + +func TestMakeDeployment_sink(t *testing.T) { + yes := true + got := MakeDeployment(ContainerArguments{ + Source: &v1alpha1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{Name: "test-name", UID: "TEST_UID"}, + }, + Name: "test-name", + Namespace: "test-namespace", + Image: "test-image", + Args: []string{"--test1=args1", "--test2=args2"}, + Env: []corev1.EnvVar{{ + Name: "test1", + Value: "arg1", + }, { + Name: "test2", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: "test2-secret", + }, + }, + }}, + ServiceAccountName: "test-service-account", + SinkInArgs: false, + Sink: "test-sink", + }) + + want := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-name-", + Namespace: "test-namespace", + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "sources.eventing.knative.dev/v1alpha1", + Kind: "ContainerSource", + Name: "test-name", + UID: "TEST_UID", + Controller: &yes, + BlockOwnerDeletion: &yes, + }}, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "eventing.knative.dev/source": "test-name", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "sidecar.istio.io/inject": "true", + }, + Labels: map[string]string{ + "eventing.knative.dev/source": "test-name", + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: "test-service-account", + Containers: []corev1.Container{ + { + Name: "source", + Image: "test-image", + Args: []string{ + "--test1=args1", + "--test2=args2", + "--sink=test-sink", + }, + Env: []corev1.EnvVar{ + { + Name: "test1", + Value: "arg1", + }, { + Name: "test2", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: "test2-secret", + }, + }, + }, { + Name: "SINK", + Value: "test-sink", + }}, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected deploy (-want, +got) = %v", diff) + } +} + +func TestMakeDeployment_sinkinargs(t *testing.T) { + yes := true + got := MakeDeployment(ContainerArguments{ + Source: &v1alpha1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{Name: "test-name", UID: "TEST_UID"}, + }, + Name: "test-name", + Namespace: "test-namespace", + Image: "test-image", + Args: []string{"--test1=args1", "--test2=args2", "--sink=test-sink"}, + Env: []corev1.EnvVar{{ + Name: "test1", + Value: "arg1", + }, { + Name: "test2", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: "test2-secret", + }, + }, + }}, + ServiceAccountName: "test-service-account", + SinkInArgs: true, + Labels: map[string]string{"eventing.knative.dev/source": "test-name"}, + Annotations: map[string]string{"sidecar.istio.io/inject": "true"}, + }) + + want := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-name-", + Namespace: "test-namespace", + OwnerReferences: []metav1.OwnerReference{{ + APIVersion: "sources.eventing.knative.dev/v1alpha1", + Kind: "ContainerSource", + Name: "test-name", + UID: "TEST_UID", + Controller: &yes, + BlockOwnerDeletion: &yes, + }}, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "eventing.knative.dev/source": "test-name", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "sidecar.istio.io/inject": "true", + }, + Labels: map[string]string{ + "eventing.knative.dev/source": "test-name", + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: "test-service-account", + Containers: []corev1.Container{ + { + Name: "source", + Image: "test-image", + Args: []string{ + "--test1=args1", + "--test2=args2", + "--sink=test-sink", + }, + Env: []corev1.EnvVar{ + { + Name: "test1", + Value: "arg1", + }, { + Name: "test2", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + Key: "test2-secret", + }, + }, + }, { + Name: "SINK", + Value: "test-sink", + }}, + ImagePullPolicy: corev1.PullIfNotPresent, + }, + }, + }, + }, + }, + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected deploy (-want, +got) = %v", diff) + } +} diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 488f144ca5b..9c678d26c04 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -28,7 +28,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" - apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -116,7 +115,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { // Get the CronJobSource resource with this namespace/name original, err := r.cronjobLister.CronJobSources(namespace).Get(name) - if apierrs.IsNotFound(err) { + if apierrors.IsNotFound(err) { // The resource may no longer exist, in which case we stop processing. logging.FromContext(ctx).Error("CronJobSource key in work queue no longer exists", zap.Any("key", key)) return nil diff --git a/pkg/reconciler/cronjobsource/cronjobsource_test.go b/pkg/reconciler/cronjobsource/cronjobsource_test.go index a971cb1e4e9..ce3ed0ca5fd 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource_test.go +++ b/pkg/reconciler/cronjobsource/cronjobsource_test.go @@ -17,46 +17,33 @@ limitations under the License. package cronjobsource import ( - "github.com/knative/eventing/pkg/reconciler/cronjobsource/resources" - "github.com/knative/eventing/pkg/utils" - "k8s.io/apimachinery/pkg/runtime" "os" "testing" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubeinformers "k8s.io/client-go/informers" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" fakeclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" informers "github.com/knative/eventing/pkg/client/informers/externalversions" "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/cronjobsource/resources" + "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "github.com/knative/pkg/controller" logtesting "github.com/knative/pkg/logging/testing" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - //"k8s.io/apimachinery/pkg/runtime" - kubeinformers "k8s.io/client-go/informers" - fakekubeclientset "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/kubernetes/scheme" . "github.com/knative/eventing/pkg/reconciler/testing" . "github.com/knative/pkg/reconciler/testing" - - sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" - v1 "k8s.io/api/apps/v1" ) var ( - // deletionTime is used when objects are marked as deleted. Rfc3339Copy() - // truncates to seconds to match the loss of precision during serialization. - deletionTime = metav1.Now().Rfc3339Copy() - - trueVal = true - - sinkGVK = metav1.GroupVersionKind{ - Group: "eventing.knative.dev", - Version: "v1alpha1", - Kind: "Channel", - } sinkRef = corev1.ObjectReference{ Name: sinkName, Kind: "Channel", @@ -69,7 +56,6 @@ var ( const ( image = "github.com/knative/test/image" sourceName = "test-cronjob-source" - sourceUID = "1234-5678-90" testNS = "testnamespace" testSchedule = "*/2 * * * *" testData = "data" @@ -79,7 +65,7 @@ const ( func init() { // Add types to scheme - _ = v1.AddToScheme(scheme.Scheme) + _ = appsv1.AddToScheme(scheme.Scheme) _ = corev1.AddToScheme(scheme.Scheme) _ = duckv1alpha1.AddToScheme(scheme.Scheme) @@ -282,7 +268,7 @@ func TestNew(t *testing.T) { } } -func makeReceiveAdapter() *v1.Deployment { +func makeReceiveAdapter() *appsv1.Deployment { source := NewCronSourceJob(sourceName, testNS, WithCronJobSourceSpec(sourcesv1alpha1.CronJobSourceSpec{ Schedule: testSchedule, diff --git a/pkg/reconciler/testing/containersource.go b/pkg/reconciler/testing/containersource.go new file mode 100644 index 00000000000..bc43e909f90 --- /dev/null +++ b/pkg/reconciler/testing/containersource.go @@ -0,0 +1,112 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "k8s.io/apimachinery/pkg/types" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" +) + +// ContainerSourceOption enables further configuration of a CronJob. +type ContainerSourceOption func(*v1alpha1.ContainerSource) + +// NewCronJob creates a CronJob with CronJobOptions +func NewContainerSource(name, namespace string, o ...ContainerSourceOption) *v1alpha1.ContainerSource { + c := &v1alpha1.ContainerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, opt := range o { + opt(c) + } + //c.SetDefaults(context.Background()) // TODO: We should add defaults and validation. + return c +} + +func WithContainerSourceUID(uid types.UID) ContainerSourceOption { + return func(s *v1alpha1.ContainerSource) { + s.UID = uid + } +} + +// WithInitContainerSourceConditions initializes the ContainerSource's conditions. +func WithInitContainerSourceConditions(s *v1alpha1.ContainerSource) { + s.Status.InitializeConditions() +} + +func WithContainerSourceSinkNotFound(msg string) ContainerSourceOption { + return func(s *v1alpha1.ContainerSource) { + s.Status.MarkNoSink("NotFound", msg) + } +} + +func WithContainerSourceSinkMissing(msg string) ContainerSourceOption { + return func(s *v1alpha1.ContainerSource) { + s.Status.MarkNoSink("Missing", msg) + } +} + +func WithContainerSourceSink(uri string) ContainerSourceOption { + return func(s *v1alpha1.ContainerSource) { + s.Status.MarkSink(uri) + } +} + +func WithContainerSourceDeploying(msg string) ContainerSourceOption { + return func(s *v1alpha1.ContainerSource) { + s.Status.MarkDeploying("DeploymentCreated", msg) + } +} + +func WithContainerSourceDeployFailed(msg string) ContainerSourceOption { + return func(s *v1alpha1.ContainerSource) { + s.Status.MarkNotDeployed("DeploymentCreateFailed", msg) + } +} + +func WithContainerSourceDeployed(s *v1alpha1.ContainerSource) { + s.Status.MarkDeployed() +} + +func WithContainerSourceDeleted(c *v1alpha1.ContainerSource) { + t := metav1.NewTime(time.Unix(1e9, 0)) + c.ObjectMeta.SetDeletionTimestamp(&t) +} + +func WithContainerSourceSpec(spec v1alpha1.ContainerSourceSpec) ContainerSourceOption { + return func(c *v1alpha1.ContainerSource) { + c.Spec = spec + } +} + +func WithContainerSourceLabels(labels map[string]string) ContainerSourceOption { + return func(c *v1alpha1.ContainerSource) { + c.Labels = labels + } +} + +func WithContainerSourceAnnotations(annotations map[string]string) ContainerSourceOption { + return func(c *v1alpha1.ContainerSource) { + c.Annotations = annotations + } +} diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index 8d634d3a91c..bf17f6dd67f 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -120,6 +120,10 @@ func (l *Listers) GetCronJobSourceLister() sourcelisters.CronJobSourceLister { return sourcelisters.NewCronJobSourceLister(l.indexerFor(&sourcesv1alpha1.CronJobSource{})) } +func (l *Listers) GetContainerSourceLister() sourcelisters.ContainerSourceLister { + return sourcelisters.NewContainerSourceLister(l.indexerFor(&sourcesv1alpha1.ContainerSource{})) +} + // GetGatewayLister gets lister for Istio Gateway resource. func (l *Listers) GetGatewayLister() istiolisters.GatewayLister { return istiolisters.NewGatewayLister(l.indexerFor(&istiov1alpha3.Gateway{}))