From 8dcada891e276dcf4bef6919c788a86584785c46 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Fri, 26 Apr 2019 11:36:27 -0400 Subject: [PATCH] move apiserver source to eventing --- Gopkg.lock | 10 + cmd/apiserver_receive_adapter/main.go | 136 ++++++++ cmd/sources-controller/main.go | 10 +- config/200-controller-clusterrole.yaml | 3 + config/300-apiserversource.yaml | 88 +++++ config/500-controller.yaml | 2 + config/500-sources-controller.yaml | 2 + pkg/adapter/apiserver/adapter.go | 129 +++++++ pkg/adapter/apiserver/adapter_test.go | 203 +++++++++++ .../sources/v1alpha1/apiserver_lifecycle.go | 66 ++++ .../v1alpha1/apiserver_lifecycle_test.go | 160 +++++++++ pkg/apis/sources/v1alpha1/apiserver_types.go | 101 ++++++ pkg/apis/sources/v1alpha1/register.go | 2 + pkg/apis/sources/v1alpha1/register_test.go | 2 + .../sources/v1alpha1/zz_generated.deepcopy.go | 120 +++++++ .../typed/sources/v1alpha1/apiserversource.go | 174 ++++++++++ .../v1alpha1/fake/fake_apiserversource.go | 140 ++++++++ .../v1alpha1/fake/fake_sources_client.go | 4 + .../sources/v1alpha1/generated_expansion.go | 2 + .../typed/sources/v1alpha1/sources_client.go | 5 + .../informers/externalversions/generic.go | 2 + .../sources/v1alpha1/apiserversource.go | 89 +++++ .../sources/v1alpha1/interface.go | 7 + .../sources/v1alpha1/apiserversource.go | 94 ++++++ .../sources/v1alpha1/expansion_generated.go | 8 + .../apiserversource/apiserversource.go | 276 +++++++++++++++ .../apiserversource/apiserversource_test.go | 202 +++++++++++ pkg/reconciler/apiserversource/doc.go | 18 + .../apiserversource/resources/labels.go | 30 ++ .../resources/receive_adapter.go | 117 +++++++ .../resources/receive_adapter_test.go | 127 +++++++ pkg/reconciler/testing/apiserversource.go | 73 ++++ pkg/reconciler/testing/listers.go | 4 + third_party/VENDOR-LICENSE | 25 ++ .../kelseyhightower/envconfig/LICENSE | 19 ++ .../kelseyhightower/envconfig/doc.go | 8 + .../kelseyhightower/envconfig/env_os.go | 7 + .../kelseyhightower/envconfig/env_syscall.go | 7 + .../kelseyhightower/envconfig/envconfig.go | 319 ++++++++++++++++++ .../kelseyhightower/envconfig/usage.go | 158 +++++++++ 40 files changed, 2948 insertions(+), 1 deletion(-) create mode 100644 cmd/apiserver_receive_adapter/main.go create mode 100644 config/300-apiserversource.yaml create mode 100644 pkg/adapter/apiserver/adapter.go create mode 100644 pkg/adapter/apiserver/adapter_test.go create mode 100644 pkg/apis/sources/v1alpha1/apiserver_lifecycle.go create mode 100644 pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go create mode 100644 pkg/apis/sources/v1alpha1/apiserver_types.go create mode 100644 pkg/client/clientset/versioned/typed/sources/v1alpha1/apiserversource.go create mode 100644 pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_apiserversource.go create mode 100644 pkg/client/informers/externalversions/sources/v1alpha1/apiserversource.go create mode 100644 pkg/client/listers/sources/v1alpha1/apiserversource.go create mode 100644 pkg/reconciler/apiserversource/apiserversource.go create mode 100644 pkg/reconciler/apiserversource/apiserversource_test.go create mode 100644 pkg/reconciler/apiserversource/doc.go create mode 100644 pkg/reconciler/apiserversource/resources/labels.go create mode 100644 pkg/reconciler/apiserversource/resources/receive_adapter.go create mode 100644 pkg/reconciler/apiserversource/resources/receive_adapter_test.go create mode 100644 pkg/reconciler/testing/apiserversource.go create mode 100644 vendor/github.com/kelseyhightower/envconfig/LICENSE create mode 100644 vendor/github.com/kelseyhightower/envconfig/doc.go create mode 100644 vendor/github.com/kelseyhightower/envconfig/env_os.go create mode 100644 vendor/github.com/kelseyhightower/envconfig/env_syscall.go create mode 100644 vendor/github.com/kelseyhightower/envconfig/envconfig.go create mode 100644 vendor/github.com/kelseyhightower/envconfig/usage.go diff --git a/Gopkg.lock b/Gopkg.lock index 07057341548..5fd48dab9cf 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -424,6 +424,14 @@ pruneopts = "NUT" revision = "f2b4162afba35581b6d4a50d3b8f34e33c144682" +[[projects]] + digest = "1:b8870bf2606dca65dc382f4cb8b7a434f17ff36a915451bda12788e9620be368" + name = "github.com/kelseyhightower/envconfig" + packages = ["."] + pruneopts = "NUT" + revision = "f611eb38b3875cc3bd991ca91c51d06446afa14c" + version = "v1.3.0" + [[projects]] digest = "1:57d04562d05dd4500ff1e7e47f2e62b9be0531388377a3b691a012ce70b210d5" name = "github.com/knative/pkg" @@ -1369,12 +1377,14 @@ "github.com/cloudevents/sdk-go", "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/client", + "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", "github.com/cloudevents/sdk-go/pkg/cloudevents/types", "github.com/fsnotify/fsnotify", "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/uuid", + "github.com/kelseyhightower/envconfig", "github.com/knative/pkg/apis", "github.com/knative/pkg/apis/duck", "github.com/knative/pkg/apis/duck/v1alpha1", diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go new file mode 100644 index 00000000000..e7bc7004eed --- /dev/null +++ b/cmd/apiserver_receive_adapter/main.go @@ -0,0 +1,136 @@ +/* +Copyright 2019 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "time" + + "k8s.io/client-go/rest" + + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + + "github.com/knative/eventing/pkg/reconciler" + + "github.com/kelseyhightower/envconfig" + "github.com/knative/eventing/pkg/adapter/apiserver" + "github.com/knative/eventing/pkg/kncloudevents" + "github.com/knative/pkg/apis/duck" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + kncontroller "github.com/knative/pkg/controller" + "github.com/knative/pkg/signals" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/clientcmd" +) + +var ( + masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") +) + +type envConfig struct { + SinkURI string `split_words:"true" required:"true"` + ApiVersion []string `split_words:"true" required:"true"` + Kind []string `required:"true"` + Controller []bool `required:"true"` +} + +func main() { + flag.Parse() + + logCfg := zap.NewProductionConfig() + logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + dlogger, err := logCfg.Build() + logger := dlogger.Sugar() + + var env envConfig + err = envconfig.Process("", &env) + if err != nil { + logger.Fatalw("Error processing environment", zap.Error(err)) + } + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig) + if err != nil { + logger.Fatalw("Error building kubeconfig", zap.Error(err)) + } + + logger = logger.With(zap.String("controller/apiserver", "adapter")) + logger.Info("Starting the controller") + + numControllers := len(env.ApiVersion) + cfg.QPS = float32(numControllers) * rest.DefaultQPS + cfg.Burst = numControllers * rest.DefaultBurst + opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) + + client, err := dynamic.NewForConfig(cfg) + if err != nil { + logger.Fatalw("Error building dynamic client", zap.Error(err)) + } + + eventsClient, err := kncloudevents.NewDefaultClient(env.SinkURI) + if err != nil { + logger.Fatalw("Error building cloud event client", zap.Error(err)) + } + + controllers := []*kncontroller.Impl{} + + // Create one controller per resource. + for i, apiVersion := range env.ApiVersion { + kind := env.Kind[i] + controlled := env.Controller[i] + + obj := &duckv1alpha1.AddressableType{} + + factory := duck.TypedInformerFactory{ + Client: client, + ResyncPeriod: time.Duration(10), // TODO + StopChannel: stopCh, + Type: obj, + } + + gv, err := schema.ParseGroupVersion(apiVersion) + if err != nil { + logger.Fatalw("Error parsing APIVersion", zap.Error(err)) + } + + gvk := schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version} + + // This is really bad. + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + + // Get and start the informer for gvr + logger.Infof("Starting informer for %v", gvk) + informer, lister, err := factory.Get(gvr) + if err != nil { + logger.Fatalw("Error starting informer", zap.Error(err)) + } + controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled)) + } + + // Start all of the controllers. + logger.Info("Starting controllers.") + go kncontroller.StartAll(stopCh, controllers...) + <-stopCh +} diff --git a/cmd/sources-controller/main.go b/cmd/sources-controller/main.go index 0868ebe9749..ebe79143ae9 100644 --- a/cmd/sources-controller/main.go +++ b/cmd/sources-controller/main.go @@ -33,6 +33,7 @@ import ( "github.com/knative/eventing/pkg/logconfig" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/apiserversource" "github.com/knative/eventing/pkg/reconciler/cronjobsource" "github.com/knative/pkg/configmap" kncontroller "github.com/knative/pkg/controller" @@ -65,7 +66,7 @@ func main() { logger = logger.With(zap.String("controller/impl", "pkg")) logger.Info("Starting the controller") - const numControllers = 2 + const numControllers = 3 cfg.QPS = numControllers * rest.DefaultQPS cfg.Burst = numControllers * rest.DefaultBurst opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) @@ -76,6 +77,7 @@ func main() { // Eventing cronJobSourceInformer := eventingInformerFactory.Sources().V1alpha1().CronJobSources() containerSourceInformer := eventingInformerFactory.Sources().V1alpha1().ContainerSources() + apiserverSourceInformer := eventingInformerFactory.Sources().V1alpha1().ApiServerSources() // Kube deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() @@ -94,6 +96,11 @@ func main() { containerSourceInformer, deploymentInformer, ), + apiserversource.NewController( + opt, + apiserverSourceInformer, + deploymentInformer, + ), } // This line asserts at compile time that the length of controllers is equal to numControllers. // It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that @@ -117,6 +124,7 @@ func main() { // Eventing cronJobSourceInformer.Informer(), containerSourceInformer.Informer(), + apiserverSourceInformer.Informer(), // Kube deploymentInformer.Informer(), ); err != nil { diff --git a/config/200-controller-clusterrole.yaml b/config/200-controller-clusterrole.yaml index d6772c6d6cd..37a4178fcd6 100644 --- a/config/200-controller-clusterrole.yaml +++ b/config/200-controller-clusterrole.yaml @@ -77,4 +77,7 @@ rules: - "containersources" - "containersources/status" - "containersources/finalizers" + - "apiserversources" + - "apiserversources/status" + - "apiserversources/finalizers" verbs: *everything diff --git a/config/300-apiserversource.yaml b/config/300-apiserversource.yaml new file mode 100644 index 00000000000..dfdc580e0ad --- /dev/null +++ b/config/300-apiserversource.yaml @@ -0,0 +1,88 @@ +# Copyright 2019 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + creationTimestamp: null + labels: + eventing.knative.dev/source: "true" + knative.dev/crd-install: "true" + name: apiserversources.sources.eventing.knative.dev +spec: + group: sources.eventing.knative.dev + names: + categories: + - all + - knative + - eventing + - sources + kind: ApiServerSource + plural: apiserversources + scope: Namespaced + subresources: + status: {} + validation: + openAPIV3Schema: + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + properties: + serviceAccountName: + type: string + sink: + type: object + resources: + items: + properties: + apiVersion: + type: string + kind: + type: string + type: array + required: + - resources + - sink + type: object + status: + properties: + conditions: + items: + properties: + lastTransitionTime: + type: string + message: + type: string + reason: + type: string + severity: + type: string + status: + type: string + type: + type: string + required: + - type + - status + type: object + type: array + sinkUri: + type: string + type: object + version: v1alpha1 diff --git a/config/500-controller.yaml b/config/500-controller.yaml index 2cb7d815c06..092945f9589 100644 --- a/config/500-controller.yaml +++ b/config/500-controller.yaml @@ -52,6 +52,8 @@ spec: value: eventing-broker-filter - name: CRONJOB_RA_IMAGE value: github.com/knative/eventing/cmd/cronjob_receive_adapter + - name: APISERVER_RA_IMAGE + value: github.com/knative/eventing/cmd/apiserver_receiver_adapter ports: - containerPort: 9090 name: metrics diff --git a/config/500-sources-controller.yaml b/config/500-sources-controller.yaml index 6f77f264418..f4998c16207 100644 --- a/config/500-sources-controller.yaml +++ b/config/500-sources-controller.yaml @@ -62,6 +62,8 @@ spec: # This is the Go import path for cron job receive adapter binary # that is containerized and substituted here. value: github.com/knative/eventing/cmd/cronjob_receive_adapter + - name: APISERVER_RA_IMAGE + value: github.com/knative/eventing/cmd/apiserver_receive_adapter volumes: - name: config-logging configMap: diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go new file mode 100644 index 00000000000..db14f450645 --- /dev/null +++ b/pkg/adapter/apiserver/adapter.go @@ -0,0 +1,129 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + + "github.com/cloudevents/sdk-go/pkg/cloudevents" + eventsclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + "github.com/knative/eventing/pkg/reconciler" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/controller" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/cache" +) + +const ( + // ReconcilerName is the name of the reconciler + ReconcilerName = "ApiServerSource" + + controllerAgentName = "apiserver-source-adapter-controller" + updateEventType = "dev.knative.apiserver.object.update" + deleteEventType = "dev.knative.apiserver.object.delete" +) + +// NewController initializes the controller and is called by the generated code +// Registers event handlers to enqueue events +func NewController( + opt reconciler.Options, + informer cache.SharedInformer, + lister cache.GenericLister, + eventsclient eventsclient.Client, + controlled bool) *controller.Impl { + + r := &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + lister: lister, + eventsClient: eventsclient, + } + impl := controller.NewImpl(r, r.Logger, ReconcilerName, reconciler.MustNewStatsReporter(ReconcilerName, r.Logger)) + + r.Logger.Info("Setting up event handlers") + + if controlled { + informer.AddEventHandler(reconciler.Handler(impl.EnqueueControllerOf)) + } else { + informer.AddEventHandler(reconciler.Handler(impl.Enqueue)) + } + return impl +} + +// Reconciler reconciles an ApiServerSource object +type Reconciler struct { + *reconciler.Base + + eventsClient eventsclient.Client + lister cache.GenericLister +} + +// Reconcile sends a cloud event corresponding to the given key +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + r.Logger.Errorf("invalid resource key: %s", key) + return nil + } + + // Get the resource with this namespace/name + original, err := r.lister.ByNamespace(namespace).Get(name) + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + r.Logger.Error("resource key in work queue no longer exists", zap.Any("key", key)) + return nil + } else if err != nil { + return err + } + + object := original.(*duckv1alpha1.AddressableType) + + eventType := updateEventType + timestamp := object.GetCreationTimestamp() + if object.GetDeletionTimestamp() != nil { + eventType = deleteEventType + timestamp = *object.GetDeletionTimestamp() + } + + objectRef := corev1.ObjectReference{ + APIVersion: object.APIVersion, + Kind: object.Kind, + Name: object.GetName(), + Namespace: object.GetNamespace(), + } + + event := cloudevents.Event{ + Context: cloudevents.EventContextV02{ + ID: string(object.GetUID()), + Type: eventType, + Source: *types.ParseURLRef(object.GetSelfLink()), + Time: &types.Timestamp{Time: timestamp.Time}, + }.AsV02(), + Data: objectRef, + } + + if _, err := r.eventsClient.Send(ctx, event); err != nil { + r.Logger.Error("failed to send cloudevent (retrying)", err) + + return err + } + + return nil +} diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go new file mode 100644 index 00000000000..4cddd8d2f28 --- /dev/null +++ b/pkg/adapter/apiserver/adapter_test.go @@ -0,0 +1,203 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + gotesting "testing" + "time" + + "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json" + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/kncloudevents" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/pkg/apis/duck" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + logtesting "github.com/knative/pkg/logging/testing" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + fakedynamicclientset "k8s.io/client-go/dynamic/fake" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" +) + +const ( + sourceName = "test-apiserver-adapter" + sourceUID = "1234-5678-90" + testNS = "testnamespace" +) + +type testCase struct { + // Name is a descriptive name for this test suitable as a first argument to t.Run() + Name string + + // InitialState is the list of objects that already exists when reconciliation + // starts. + InitialState []runtime.Object + + // Key is the parameter to reconciliation. + // This has the form "namespace/name". + Key string + + // Where to send events + sink func(http.ResponseWriter, *http.Request) + + // Expected event data + data interface{} +} + +func TestReconcile(t *gotesting.T) { + table := []testCase{ + testCase{ + Name: "Receive Pod creation event", + InitialState: []runtime.Object{ + getPod(), + }, + Key: testNS + "/" + sourceName, + + sink: sinkAccepted, + data: decode(t, encode(t, getPodRef())), + }, + } + + for _, tc := range table { + t.Run(tc.Name, func(t *gotesting.T) { + // Create fake sink server + h := &fakeHandler{ + handler: tc.sink, + } + + sinkServer := httptest.NewServer(h) + defer sinkServer.Close() + + // Bind cloud event client + ceClient, err := kncloudevents.NewDefaultClient(sinkServer.URL) + if err != nil { + t.Errorf("cannot create cloud event client: %v", zap.Error(err)) + } + + // Create fake dynamic client + dynamicScheme := runtime.NewScheme() + client := fakedynamicclientset.NewSimpleDynamicClient(dynamicScheme, tc.InitialState...) + + stopCh := make(chan struct{}) + defer close(stopCh) + + tif := &duck.TypedInformerFactory{ + Client: client, + Type: &duckv1alpha1.AddressableType{}, + ResyncPeriod: 1 * time.Second, + StopChannel: stopCh, + } + + _, lister, err := tif.Get(schema.GroupVersionResource{Group: "", Resource: "pods", Version: "v1"}) + if err != nil { + t.Fatalf("Get() = %v", err) + } + + opt := reconciler.Options{ + KubeClientSet: fakekubeclientset.NewSimpleClientset(), + Logger: logtesting.TestLogger(t), + } + + r := &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + eventsClient: ceClient, + lister: lister, + } + ctx := context.Background() + + err = r.Reconcile(ctx, tc.Key) + if err != nil { + t.Errorf("Expected no error") + } + + if diff := cmp.Diff(tc.data, decode(t, h.body)); diff != "" { + t.Errorf("incorrect event (-want, +got): %v", diff) + } + }) + } + +} + +func getPod() runtime.Object { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": testNS, + "name": sourceName, + "selfLink": "/apis/v1/namespaces/" + testNS + "/pod/" + sourceName, + }, + }, + } +} + +func getPodRef() corev1.ObjectReference { + return corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Pod", + Name: sourceName, + Namespace: testNS, + } +} + +type fakeHandler struct { + body []byte + header http.Header + + handler func(http.ResponseWriter, *http.Request) +} + +func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.header = r.Header + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, "can not read body", http.StatusBadRequest) + return + } + h.body = body + defer r.Body.Close() + h.handler(w, r) +} + +func sinkAccepted(writer http.ResponseWriter, req *http.Request) { + writer.WriteHeader(http.StatusOK) +} + +func encode(t *gotesting.T, data interface{}) string { + b, err := json.Encode(data) + if err != nil { + t.Fatalf("failed to encode data: %v", err) + } + return string(b) +} + +func decode(t *gotesting.T, data interface{}) interface{} { + var out interface{} + err := json.Decode(data, &out) + if err != nil { + t.Fatalf("failed to decode data: %v", err) + } + return out +} diff --git a/pkg/apis/sources/v1alpha1/apiserver_lifecycle.go b/pkg/apis/sources/v1alpha1/apiserver_lifecycle.go new file mode 100644 index 00000000000..b5c96f60023 --- /dev/null +++ b/pkg/apis/sources/v1alpha1/apiserver_lifecycle.go @@ -0,0 +1,66 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" +) + +// GetConditions returns Conditions +func (s *ApiServerSourceStatus) GetConditions() duckv1alpha1.Conditions { + return s.Conditions +} + +// SetConditions sets Conditions +func (s *ApiServerSourceStatus) SetConditions(conditions duckv1alpha1.Conditions) { + s.Conditions = conditions +} + +// GetCondition returns the condition currently associated with the given type, or nil. +func (s *ApiServerSourceStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { + return apiserverCondSet.Manage(s).GetCondition(t) +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (s *ApiServerSourceStatus) InitializeConditions() { + apiserverCondSet.Manage(s).InitializeConditions() +} + +// MarkSink sets the condition that the source has a sink configured. +func (s *ApiServerSourceStatus) MarkSink(uri string) { + s.SinkURI = uri + if len(uri) > 0 { + apiserverCondSet.Manage(s).MarkTrue(ApiServerConditionSinkProvided) + } else { + apiserverCondSet.Manage(s).MarkUnknown(ApiServerConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "") + } +} + +// MarkNoSink sets the condition that the source does not have a sink configured. +func (s *ApiServerSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) { + apiserverCondSet.Manage(s).MarkFalse(ApiServerConditionSinkProvided, reason, messageFormat, messageA...) +} + +// MarkDeployed sets the condition that the source has been deployed. +func (s *ApiServerSourceStatus) MarkDeployed() { + apiserverCondSet.Manage(s).MarkTrue(ApiServerConditionDeployed) +} + +// IsReady returns true if the resource is ready overall. +func (s *ApiServerSourceStatus) IsReady() bool { + return apiserverCondSet.Manage(s).IsHappy() +} diff --git a/pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go b/pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go new file mode 100644 index 00000000000..f6df49e1da2 --- /dev/null +++ b/pkg/apis/sources/v1alpha1/apiserver_lifecycle_test.go @@ -0,0 +1,160 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" +) + +func TestApiServerSourceStatusIsReady(t *testing.T) { + tests := []struct { + name string + s *ApiServerSourceStatus + want bool + }{{ + name: "uninitialized", + s: &ApiServerSourceStatus{}, + want: false, + }, { + name: "initialized", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + return s + }(), + want: false, + }, { + name: "mark deployed", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + s.MarkDeployed() + return s + }(), + want: false, + }, { + name: "mark sink", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + return s + }(), + want: false, + }, { + name: "mark sink and deployed", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + return s + }(), + want: true, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.s.IsReady() + if diff := cmp.Diff(test.want, got); diff != "" { + t.Errorf("%s: unexpected condition (-want, +got) = %v", test.name, diff) + } + }) + } +} + +func TestApiServerSourceStatusGetCondition(t *testing.T) { + tests := []struct { + name string + s *ApiServerSourceStatus + condQuery duckv1alpha1.ConditionType + want *duckv1alpha1.Condition + }{{ + name: "uninitialized", + s: &ApiServerSourceStatus{}, + condQuery: ApiServerConditionReady, + want: nil, + }, { + name: "initialized", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + return s + }(), + condQuery: ApiServerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ApiServerConditionReady, + Status: corev1.ConditionUnknown, + }, + }, { + name: "mark deployed", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + s.MarkDeployed() + return s + }(), + condQuery: ApiServerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ApiServerConditionReady, + Status: corev1.ConditionUnknown, + }, + }, { + name: "mark sink", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + return s + }(), + condQuery: ApiServerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ApiServerConditionReady, + Status: corev1.ConditionUnknown, + }, + }, { + name: "mark sink and deployed", + s: func() *ApiServerSourceStatus { + s := &ApiServerSourceStatus{} + s.InitializeConditions() + s.MarkSink("uri://example") + s.MarkDeployed() + return s + }(), + condQuery: ApiServerConditionReady, + want: &duckv1alpha1.Condition{ + Type: ApiServerConditionReady, + Status: corev1.ConditionTrue, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := test.s.GetCondition(test.condQuery) + ignoreTime := cmpopts.IgnoreFields(duckv1alpha1.Condition{}, + "LastTransitionTime", "Severity") + if diff := cmp.Diff(test.want, got, ignoreTime); diff != "" { + t.Errorf("unexpected condition (-want, +got) = %v", diff) + } + }) + } +} diff --git a/pkg/apis/sources/v1alpha1/apiserver_types.go b/pkg/apis/sources/v1alpha1/apiserver_types.go new file mode 100644 index 00000000000..d3ae3b89947 --- /dev/null +++ b/pkg/apis/sources/v1alpha1/apiserver_types.go @@ -0,0 +1,101 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ApiServerSource is the Schema for the apiserversources API +// +k8s:openapi-gen=true +type ApiServerSource struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec ApiServerSourceSpec `json:"spec,omitempty"` + Status ApiServerSourceStatus `json:"status,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ApiServerSourceList contains a list of ApiServerSource +type ApiServerSourceList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []ApiServerSource `json:"items"` +} + +const ( + // ApiServerConditionReady has status True when the ApiServerSource is ready to send events. + ApiServerConditionReady = duckv1alpha1.ConditionReady + + // ApiServerConditionSinkProvided has status True when the ApiServerSource has been configured with a sink target. + ApiServerConditionSinkProvided duckv1alpha1.ConditionType = "SinkProvided" + + // ApiServerConditionDeployed has status True when the ApiServerSource has had it's deployment created. + ApiServerConditionDeployed duckv1alpha1.ConditionType = "Deployed" +) + +var apiserverCondSet = duckv1alpha1.NewLivingConditionSet( + ApiServerConditionSinkProvided, + ApiServerConditionDeployed, +) + +// ApiServerSourceSpec defines the desired state of ApiServerSource +type ApiServerSourceSpec struct { + // Resources is the list of resources to watch + Resources []ApiServerResource `json:"resources"` + + // ServiceAccountName is the name of the ServiceAccount to use to run this + // source. + // +optional + ServiceAccountName string `json:"serviceAccountName,omitempty"` + + // Sink is a reference to an object that will resolve to a domain name to use as the sink. + // +optional + Sink *corev1.ObjectReference `json:"sink,omitempty"` +} + +// ApiServerSourceStatus defines the observed state of ApiServerSource +type ApiServerSourceStatus struct { + // inherits duck/v1alpha1 Status, which currently provides: + // * ObservedGeneration - the 'Generation' of the Service that was last processed by the controller. + // * Conditions - the latest available observations of a resource's current state. + duckv1alpha1.Status `json:",inline"` + + // SinkURI is the current active sink URI that has been configured for the ApiServerSource. + // +optional + SinkURI string `json:"sinkUri,omitempty"` +} + +// ApiServerResource defines the resource to watch +type ApiServerResource struct { + // API version of the resource to watch. + APIVersion string `json:"apiVersion"` + + // Kind of the resource to watch. + // More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds + Kind string `json:"kind"` + + // If true, send an event referencing the object controlling the resource + Controller bool `json:"controller"` +} diff --git a/pkg/apis/sources/v1alpha1/register.go b/pkg/apis/sources/v1alpha1/register.go index 83556830ff3..cf1656669c7 100644 --- a/pkg/apis/sources/v1alpha1/register.go +++ b/pkg/apis/sources/v1alpha1/register.go @@ -49,6 +49,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &CronJobSourceList{}, &ContainerSource{}, &ContainerSourceList{}, + &ApiServerSource{}, + &ApiServerSourceList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/sources/v1alpha1/register_test.go b/pkg/apis/sources/v1alpha1/register_test.go index c111fda185c..19055cf4a66 100644 --- a/pkg/apis/sources/v1alpha1/register_test.go +++ b/pkg/apis/sources/v1alpha1/register_test.go @@ -64,6 +64,8 @@ func TestKnownTypes(t *testing.T) { "CronJobSourceList", "ContainerSource", "ContainerSourceList", + "ApiServerSource", + "ApiServerSourceList", } { if _, ok := types[name]; !ok { t.Errorf("Did not find %q as registered type", name) diff --git a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go index 29ee5bbe035..134c8cad298 100644 --- a/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go @@ -25,6 +25,126 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApiServerResource) DeepCopyInto(out *ApiServerResource) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApiServerResource. +func (in *ApiServerResource) DeepCopy() *ApiServerResource { + if in == nil { + return nil + } + out := new(ApiServerResource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApiServerSource) DeepCopyInto(out *ApiServerSource) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApiServerSource. +func (in *ApiServerSource) DeepCopy() *ApiServerSource { + if in == nil { + return nil + } + out := new(ApiServerSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ApiServerSource) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApiServerSourceList) DeepCopyInto(out *ApiServerSourceList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ApiServerSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApiServerSourceList. +func (in *ApiServerSourceList) DeepCopy() *ApiServerSourceList { + if in == nil { + return nil + } + out := new(ApiServerSourceList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ApiServerSourceList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApiServerSourceSpec) DeepCopyInto(out *ApiServerSourceSpec) { + *out = *in + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]ApiServerResource, len(*in)) + copy(*out, *in) + } + if in.Sink != nil { + in, out := &in.Sink, &out.Sink + *out = new(v1.ObjectReference) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApiServerSourceSpec. +func (in *ApiServerSourceSpec) DeepCopy() *ApiServerSourceSpec { + if in == nil { + return nil + } + out := new(ApiServerSourceSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ApiServerSourceStatus) DeepCopyInto(out *ApiServerSourceStatus) { + *out = *in + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ApiServerSourceStatus. +func (in *ApiServerSourceStatus) DeepCopy() *ApiServerSourceStatus { + if in == nil { + return nil + } + out := new(ApiServerSourceStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ContainerSource) DeepCopyInto(out *ContainerSource) { *out = *in diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/apiserversource.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/apiserversource.go new file mode 100644 index 00000000000..a54b43d33e3 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/apiserversource.go @@ -0,0 +1,174 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + scheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// ApiServerSourcesGetter has a method to return a ApiServerSourceInterface. +// A group's client should implement this interface. +type ApiServerSourcesGetter interface { + ApiServerSources(namespace string) ApiServerSourceInterface +} + +// ApiServerSourceInterface has methods to work with ApiServerSource resources. +type ApiServerSourceInterface interface { + Create(*v1alpha1.ApiServerSource) (*v1alpha1.ApiServerSource, error) + Update(*v1alpha1.ApiServerSource) (*v1alpha1.ApiServerSource, error) + UpdateStatus(*v1alpha1.ApiServerSource) (*v1alpha1.ApiServerSource, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.ApiServerSource, error) + List(opts v1.ListOptions) (*v1alpha1.ApiServerSourceList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ApiServerSource, err error) + ApiServerSourceExpansion +} + +// apiServerSources implements ApiServerSourceInterface +type apiServerSources struct { + client rest.Interface + ns string +} + +// newApiServerSources returns a ApiServerSources +func newApiServerSources(c *SourcesV1alpha1Client, namespace string) *apiServerSources { + return &apiServerSources{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the apiServerSource, and returns the corresponding apiServerSource object, and an error if there is any. +func (c *apiServerSources) Get(name string, options v1.GetOptions) (result *v1alpha1.ApiServerSource, err error) { + result = &v1alpha1.ApiServerSource{} + err = c.client.Get(). + Namespace(c.ns). + Resource("apiserversources"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of ApiServerSources that match those selectors. +func (c *apiServerSources) List(opts v1.ListOptions) (result *v1alpha1.ApiServerSourceList, err error) { + result = &v1alpha1.ApiServerSourceList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("apiserversources"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested apiServerSources. +func (c *apiServerSources) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("apiserversources"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a apiServerSource and creates it. Returns the server's representation of the apiServerSource, and an error, if there is any. +func (c *apiServerSources) Create(apiServerSource *v1alpha1.ApiServerSource) (result *v1alpha1.ApiServerSource, err error) { + result = &v1alpha1.ApiServerSource{} + err = c.client.Post(). + Namespace(c.ns). + Resource("apiserversources"). + Body(apiServerSource). + Do(). + Into(result) + return +} + +// Update takes the representation of a apiServerSource and updates it. Returns the server's representation of the apiServerSource, and an error, if there is any. +func (c *apiServerSources) Update(apiServerSource *v1alpha1.ApiServerSource) (result *v1alpha1.ApiServerSource, err error) { + result = &v1alpha1.ApiServerSource{} + err = c.client.Put(). + Namespace(c.ns). + Resource("apiserversources"). + Name(apiServerSource.Name). + Body(apiServerSource). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *apiServerSources) UpdateStatus(apiServerSource *v1alpha1.ApiServerSource) (result *v1alpha1.ApiServerSource, err error) { + result = &v1alpha1.ApiServerSource{} + err = c.client.Put(). + Namespace(c.ns). + Resource("apiserversources"). + Name(apiServerSource.Name). + SubResource("status"). + Body(apiServerSource). + Do(). + Into(result) + return +} + +// Delete takes name of the apiServerSource and deletes it. Returns an error if one occurs. +func (c *apiServerSources) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("apiserversources"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *apiServerSources) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("apiserversources"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched apiServerSource. +func (c *apiServerSources) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ApiServerSource, err error) { + result = &v1alpha1.ApiServerSource{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("apiserversources"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_apiserversource.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_apiserversource.go new file mode 100644 index 00000000000..5933ac37691 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_apiserversource.go @@ -0,0 +1,140 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeApiServerSources implements ApiServerSourceInterface +type FakeApiServerSources struct { + Fake *FakeSourcesV1alpha1 + ns string +} + +var apiserversourcesResource = schema.GroupVersionResource{Group: "sources.eventing.knative.dev", Version: "v1alpha1", Resource: "apiserversources"} + +var apiserversourcesKind = schema.GroupVersionKind{Group: "sources.eventing.knative.dev", Version: "v1alpha1", Kind: "ApiServerSource"} + +// Get takes name of the apiServerSource, and returns the corresponding apiServerSource object, and an error if there is any. +func (c *FakeApiServerSources) Get(name string, options v1.GetOptions) (result *v1alpha1.ApiServerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(apiserversourcesResource, c.ns, name), &v1alpha1.ApiServerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ApiServerSource), err +} + +// List takes label and field selectors, and returns the list of ApiServerSources that match those selectors. +func (c *FakeApiServerSources) List(opts v1.ListOptions) (result *v1alpha1.ApiServerSourceList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(apiserversourcesResource, apiserversourcesKind, c.ns, opts), &v1alpha1.ApiServerSourceList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.ApiServerSourceList{ListMeta: obj.(*v1alpha1.ApiServerSourceList).ListMeta} + for _, item := range obj.(*v1alpha1.ApiServerSourceList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested apiServerSources. +func (c *FakeApiServerSources) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(apiserversourcesResource, c.ns, opts)) + +} + +// Create takes the representation of a apiServerSource and creates it. Returns the server's representation of the apiServerSource, and an error, if there is any. +func (c *FakeApiServerSources) Create(apiServerSource *v1alpha1.ApiServerSource) (result *v1alpha1.ApiServerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(apiserversourcesResource, c.ns, apiServerSource), &v1alpha1.ApiServerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ApiServerSource), err +} + +// Update takes the representation of a apiServerSource and updates it. Returns the server's representation of the apiServerSource, and an error, if there is any. +func (c *FakeApiServerSources) Update(apiServerSource *v1alpha1.ApiServerSource) (result *v1alpha1.ApiServerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(apiserversourcesResource, c.ns, apiServerSource), &v1alpha1.ApiServerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ApiServerSource), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeApiServerSources) UpdateStatus(apiServerSource *v1alpha1.ApiServerSource) (*v1alpha1.ApiServerSource, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(apiserversourcesResource, "status", c.ns, apiServerSource), &v1alpha1.ApiServerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ApiServerSource), err +} + +// Delete takes name of the apiServerSource and deletes it. Returns an error if one occurs. +func (c *FakeApiServerSources) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(apiserversourcesResource, c.ns, name), &v1alpha1.ApiServerSource{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeApiServerSources) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(apiserversourcesResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.ApiServerSourceList{}) + return err +} + +// Patch applies the patch and returns the patched apiServerSource. +func (c *FakeApiServerSources) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ApiServerSource, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(apiserversourcesResource, c.ns, name, data, subresources...), &v1alpha1.ApiServerSource{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.ApiServerSource), err +} diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go index 2742a8e7195..b244bc3539e 100644 --- a/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/fake/fake_sources_client.go @@ -28,6 +28,10 @@ type FakeSourcesV1alpha1 struct { *testing.Fake } +func (c *FakeSourcesV1alpha1) ApiServerSources(namespace string) v1alpha1.ApiServerSourceInterface { + return &FakeApiServerSources{c, namespace} +} + func (c *FakeSourcesV1alpha1) ContainerSources(namespace string) v1alpha1.ContainerSourceInterface { return &FakeContainerSources{c, namespace} } diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go index b250cd0c5e3..90d7d66818c 100644 --- a/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/generated_expansion.go @@ -18,6 +18,8 @@ limitations under the License. package v1alpha1 +type ApiServerSourceExpansion interface{} + type ContainerSourceExpansion interface{} type CronJobSourceExpansion interface{} diff --git a/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go b/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go index dbbdbdf0392..c63c6e26b69 100644 --- a/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go +++ b/pkg/client/clientset/versioned/typed/sources/v1alpha1/sources_client.go @@ -27,6 +27,7 @@ import ( type SourcesV1alpha1Interface interface { RESTClient() rest.Interface + ApiServerSourcesGetter ContainerSourcesGetter CronJobSourcesGetter } @@ -36,6 +37,10 @@ type SourcesV1alpha1Client struct { restClient rest.Interface } +func (c *SourcesV1alpha1Client) ApiServerSources(namespace string) ApiServerSourceInterface { + return newApiServerSources(c, namespace) +} + func (c *SourcesV1alpha1Client) ContainerSources(namespace string) ContainerSourceInterface { return newContainerSources(c, namespace) } diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index fbcb3b3bcba..4ba6d95cab5 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -68,6 +68,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Eventing().V1alpha1().Triggers().Informer()}, nil // Group=sources.eventing.knative.dev, Version=v1alpha1 + case sourcesv1alpha1.SchemeGroupVersion.WithResource("apiserversources"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Sources().V1alpha1().ApiServerSources().Informer()}, nil case sourcesv1alpha1.SchemeGroupVersion.WithResource("containersources"): return &genericInformer{resource: resource.GroupResource(), informer: f.Sources().V1alpha1().ContainerSources().Informer()}, nil case sourcesv1alpha1.SchemeGroupVersion.WithResource("cronjobsources"): diff --git a/pkg/client/informers/externalversions/sources/v1alpha1/apiserversource.go b/pkg/client/informers/externalversions/sources/v1alpha1/apiserversource.go new file mode 100644 index 00000000000..9b99c4a78e3 --- /dev/null +++ b/pkg/client/informers/externalversions/sources/v1alpha1/apiserversource.go @@ -0,0 +1,89 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + versioned "github.com/knative/eventing/pkg/client/clientset/versioned" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/knative/eventing/pkg/client/listers/sources/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// ApiServerSourceInformer provides access to a shared informer and lister for +// ApiServerSources. +type ApiServerSourceInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.ApiServerSourceLister +} + +type apiServerSourceInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewApiServerSourceInformer constructs a new informer for ApiServerSource type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewApiServerSourceInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredApiServerSourceInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredApiServerSourceInformer constructs a new informer for ApiServerSource type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredApiServerSourceInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SourcesV1alpha1().ApiServerSources(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.SourcesV1alpha1().ApiServerSources(namespace).Watch(options) + }, + }, + &sourcesv1alpha1.ApiServerSource{}, + resyncPeriod, + indexers, + ) +} + +func (f *apiServerSourceInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredApiServerSourceInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *apiServerSourceInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&sourcesv1alpha1.ApiServerSource{}, f.defaultInformer) +} + +func (f *apiServerSourceInformer) Lister() v1alpha1.ApiServerSourceLister { + return v1alpha1.NewApiServerSourceLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/sources/v1alpha1/interface.go b/pkg/client/informers/externalversions/sources/v1alpha1/interface.go index 9647603c828..ea02ad62be7 100644 --- a/pkg/client/informers/externalversions/sources/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/sources/v1alpha1/interface.go @@ -24,6 +24,8 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { + // ApiServerSources returns a ApiServerSourceInformer. + ApiServerSources() ApiServerSourceInformer // ContainerSources returns a ContainerSourceInformer. ContainerSources() ContainerSourceInformer // CronJobSources returns a CronJobSourceInformer. @@ -41,6 +43,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} } +// ApiServerSources returns a ApiServerSourceInformer. +func (v *version) ApiServerSources() ApiServerSourceInformer { + return &apiServerSourceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // ContainerSources returns a ContainerSourceInformer. func (v *version) ContainerSources() ContainerSourceInformer { return &containerSourceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/listers/sources/v1alpha1/apiserversource.go b/pkg/client/listers/sources/v1alpha1/apiserversource.go new file mode 100644 index 00000000000..ac970ca4ad0 --- /dev/null +++ b/pkg/client/listers/sources/v1alpha1/apiserversource.go @@ -0,0 +1,94 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// ApiServerSourceLister helps list ApiServerSources. +type ApiServerSourceLister interface { + // List lists all ApiServerSources in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.ApiServerSource, err error) + // ApiServerSources returns an object that can list and get ApiServerSources. + ApiServerSources(namespace string) ApiServerSourceNamespaceLister + ApiServerSourceListerExpansion +} + +// apiServerSourceLister implements the ApiServerSourceLister interface. +type apiServerSourceLister struct { + indexer cache.Indexer +} + +// NewApiServerSourceLister returns a new ApiServerSourceLister. +func NewApiServerSourceLister(indexer cache.Indexer) ApiServerSourceLister { + return &apiServerSourceLister{indexer: indexer} +} + +// List lists all ApiServerSources in the indexer. +func (s *apiServerSourceLister) List(selector labels.Selector) (ret []*v1alpha1.ApiServerSource, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.ApiServerSource)) + }) + return ret, err +} + +// ApiServerSources returns an object that can list and get ApiServerSources. +func (s *apiServerSourceLister) ApiServerSources(namespace string) ApiServerSourceNamespaceLister { + return apiServerSourceNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// ApiServerSourceNamespaceLister helps list and get ApiServerSources. +type ApiServerSourceNamespaceLister interface { + // List lists all ApiServerSources in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.ApiServerSource, err error) + // Get retrieves the ApiServerSource from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.ApiServerSource, error) + ApiServerSourceNamespaceListerExpansion +} + +// apiServerSourceNamespaceLister implements the ApiServerSourceNamespaceLister +// interface. +type apiServerSourceNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all ApiServerSources in the indexer for a given namespace. +func (s apiServerSourceNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.ApiServerSource, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.ApiServerSource)) + }) + return ret, err +} + +// Get retrieves the ApiServerSource from the indexer for a given namespace and name. +func (s apiServerSourceNamespaceLister) Get(name string) (*v1alpha1.ApiServerSource, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("apiserversource"), name) + } + return obj.(*v1alpha1.ApiServerSource), nil +} diff --git a/pkg/client/listers/sources/v1alpha1/expansion_generated.go b/pkg/client/listers/sources/v1alpha1/expansion_generated.go index 49fe2bab0ae..5f68e073e54 100644 --- a/pkg/client/listers/sources/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/sources/v1alpha1/expansion_generated.go @@ -18,6 +18,14 @@ limitations under the License. package v1alpha1 +// ApiServerSourceListerExpansion allows custom methods to be added to +// ApiServerSourceLister. +type ApiServerSourceListerExpansion interface{} + +// ApiServerSourceNamespaceListerExpansion allows custom methods to be added to +// ApiServerSourceNamespaceLister. +type ApiServerSourceNamespaceListerExpansion interface{} + // ContainerSourceListerExpansion allows custom methods to be added to // ContainerSourceLister. type ContainerSourceListerExpansion interface{} diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go new file mode 100644 index 00000000000..38d6b9d5f74 --- /dev/null +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -0,0 +1,276 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserversource + +import ( + "context" + "fmt" + "os" + "reflect" + "sync" + "time" + + v1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + sourceinformers "github.com/knative/eventing/pkg/client/informers/externalversions/sources/v1alpha1" + listers "github.com/knative/eventing/pkg/client/listers/sources/v1alpha1" + "github.com/knative/eventing/pkg/duck" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/apiserversource/resources" + "github.com/knative/pkg/controller" + "github.com/knative/pkg/logging" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + appsv1informers "k8s.io/client-go/informers/apps/v1" + appsv1listers "k8s.io/client-go/listers/apps/v1" + "k8s.io/client-go/tools/cache" +) + +const ( + // ReconcilerName is the name of the reconciler + ReconcilerName = "ApiServerSources" + + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "apiserver-source-controller" + + // Name of the corev1.Events emitted from the reconciliation process + apiserversourceReconciled = "ApiServerSourceReconciled" + apiserversourceUpdateStatusFailed = "ApiServerSourceUpdateStatusFailed" + + // raImageEnvVar is the name of the environment variable that contains the receive adapter's + // image. It must be defined. + raImageEnvVar = "APISERVER_RA_IMAGE" +) + +// Reconciler reconciles a ApiServerSource object +type Reconciler struct { + *reconciler.Base + + receiveAdapterImage string + once sync.Once + + // listers index properties about resources + apiserversourceLister listers.ApiServerSourceLister + deploymentLister appsv1listers.DeploymentLister +} + +// NewController initializes the controller and is called by the generated code +// Registers event handlers to enqueue events +func NewController( + opt reconciler.Options, + apiserversourceInformer sourceinformers.ApiServerSourceInformer, + deploymentInformer appsv1informers.DeploymentInformer, +) *controller.Impl { + r := &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + apiserversourceLister: apiserversourceInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), + } + impl := controller.NewImpl(r, r.Logger, ReconcilerName, reconciler.MustNewStatsReporter(ReconcilerName, r.Logger)) + + r.Logger.Info("Setting up event handlers") + apiserversourceInformer.Informer().AddEventHandler(reconciler.Handler(impl.Enqueue)) + + deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("ApiServerSource")), + Handler: reconciler.Handler(impl.EnqueueControllerOf), + }) + + return impl +} + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the ApiServerSource +// resource with the current status of the resource. +func (r *Reconciler) Reconcile(ctx context.Context, key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + r.Logger.Errorf("invalid resource key: %s", key) + return nil + } + + // Get the ApiServerSource resource with this namespace/name + original, err := r.apiserversourceLister.ApiServerSources(namespace).Get(name) + if apierrors.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("ApiServerSource key in work queue no longer exists", zap.Any("key", key)) + return nil + } else if err != nil { + return err + } + + // Don't modify the informers copy + apiserversource := original.DeepCopy() + + // Reconcile this copy of the ApiServerSource and then write back any status + // updates regardless of whether the reconcile error out. + err = r.reconcile(ctx, apiserversource) + if err != nil { + logging.FromContext(ctx).Warn("Error reconciling ApiServerSource", zap.Error(err)) + } else { + logging.FromContext(ctx).Debug("ApiServerSource reconciled") + r.Recorder.Eventf(apiserversource, corev1.EventTypeNormal, apiserversourceReconciled, `ApiServerSource reconciled: "%s/%s"`, apiserversource.Namespace, apiserversource.Name) + } + + if _, updateStatusErr := r.updateStatus(ctx, apiserversource.DeepCopy()); updateStatusErr != nil { + logging.FromContext(ctx).Warn("Failed to update the ApiServerSource", zap.Error(err)) + r.Recorder.Eventf(apiserversource, corev1.EventTypeWarning, apiserversourceUpdateStatusFailed, "Failed to update ApiServerSource's status: %v", err) + return updateStatusErr + } + + // Requeue if the resource is not ready: + return err +} + +func (r *Reconciler) reconcile(ctx context.Context, source *v1alpha1.ApiServerSource) error { + source.Status.InitializeConditions() + + sinkURI, err := duck.GetSinkURI(ctx, r.DynamicClientSet, source.Spec.Sink, source.Namespace) + if err != nil { + source.Status.MarkNoSink("NotFound", "") + return err + } + source.Status.MarkSink(sinkURI) + + _, err = r.createReceiveAdapter(ctx, source, sinkURI) + if err != nil { + r.Logger.Error("Unable to create the receive adapter", zap.Error(err)) + return err + } + + // Update source status + source.Status.MarkDeployed() + return nil +} + +func (r *Reconciler) getReceiveAdapterImage() string { + if r.receiveAdapterImage == "" { + r.once.Do(func() { + raImage, defined := os.LookupEnv(raImageEnvVar) + if !defined { + panic(fmt.Errorf("required environment variable %q not defined", raImageEnvVar)) + } + r.receiveAdapterImage = raImage + }) + } + return r.receiveAdapterImage +} + +func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.ApiServerSource, sinkURI string) (*appsv1.Deployment, error) { + ra, err := r.getReceiveAdapter(ctx, src) + if err != nil && !apierrors.IsNotFound(err) { + logging.FromContext(ctx).Error("Unable to get an existing receive adapter", zap.Error(err)) + return nil, err + } + if ra != nil { + logging.FromContext(ctx).Desugar().Info("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) + return ra, nil + } + adapterArgs := resources.ReceiveAdapterArgs{ + Image: r.getReceiveAdapterImage(), + Source: src, + Labels: resources.Labels(src.Name), + SinkURI: sinkURI, + } + expected := resources.MakeReceiveAdapter(&adapterArgs) + if ra != nil { + if r.podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) { + ra.Spec.Template.Spec = expected.Spec.Template.Spec + if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil { + return ra, err + } + logging.FromContext(ctx).Desugar().Info("Receive Adapter updated.", zap.Any("receiveAdapter", ra)) + } else { + logging.FromContext(ctx).Desugar().Info("Reusing existing receive adapter", zap.Any("receiveAdapter", ra)) + } + return ra, nil + } + + if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected); err != nil { + return nil, err + } + logging.FromContext(ctx).Desugar().Info("Receive Adapter created.", zap.Any("receiveAdapter", expected)) + return ra, err +} + +func (r *Reconciler) podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { + if !equality.Semantic.DeepDerivative(newPodSpec, oldPodSpec) { + return true + } + if len(oldPodSpec.Containers) != len(newPodSpec.Containers) { + return true + } + for i := range newPodSpec.Containers { + if !equality.Semantic.DeepEqual(newPodSpec.Containers[i].Env, oldPodSpec.Containers[i].Env) { + return true + } + } + return false +} + +func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.ApiServerSource) (*appsv1.Deployment, error) { + dl, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).List(metav1.ListOptions{ + LabelSelector: r.getLabelSelector(src).String(), + }) + if err != nil { + logging.FromContext(ctx).Desugar().Error("Unable to list deployments: %v", zap.Error(err)) + return nil, err + } + for _, dep := range dl.Items { + if metav1.IsControlledBy(&dep, src) { + return &dep, nil + } + } + return nil, apierrors.NewNotFound(schema.GroupResource{}, "") +} + +func (r *Reconciler) getLabelSelector(src *v1alpha1.ApiServerSource) labels.Selector { + return labels.SelectorFromSet(resources.Labels(src.Name)) +} + +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.ApiServerSource) (*v1alpha1.ApiServerSource, error) { + apiserversource, err := r.apiserversourceLister.ApiServerSources(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + // If there's nothing to update, just return. + if reflect.DeepEqual(apiserversource.Status, desired.Status) { + return apiserversource, nil + } + + becomesReady := desired.Status.IsReady() && !apiserversource.Status.IsReady() + + // Don't modify the informers copy. + existing := apiserversource.DeepCopy() + existing.Status = desired.Status + + cj, err := r.EventingClientSet.SourcesV1alpha1().ApiServerSources(desired.Namespace).UpdateStatus(existing) + if err == nil && becomesReady { + duration := time.Since(cj.ObjectMeta.CreationTimestamp.Time) + r.Logger.Infof("ApiServerSource %q became ready after %v", apiserversource.Name, duration) + } + + return cj, err +} diff --git a/pkg/reconciler/apiserversource/apiserversource_test.go b/pkg/reconciler/apiserversource/apiserversource_test.go new file mode 100644 index 00000000000..f93a4b9afd9 --- /dev/null +++ b/pkg/reconciler/apiserversource/apiserversource_test.go @@ -0,0 +1,202 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserversource + +import ( + "os" + "testing" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + kubeinformers "k8s.io/client-go/informers" + fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + clientgotesting "k8s.io/client-go/testing" + + sourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + fakeclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/eventing/pkg/reconciler/apiserversource/resources" + "github.com/knative/eventing/pkg/utils" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/controller" + logtesting "github.com/knative/pkg/logging/testing" + + . "github.com/knative/eventing/pkg/reconciler/testing" + . "github.com/knative/pkg/reconciler/testing" +) + +var ( + sinkRef = corev1.ObjectReference{ + Name: sinkName, + Kind: "Channel", + APIVersion: "eventing.knative.dev/v1alpha1", + } + sinkDNS = "sink.mynamespace.svc." + utils.GetClusterDomainName() + sinkURI = "http://" + sinkDNS + "/" +) + +const ( + image = "github.com/knative/test/image" + sourceName = "test-apiserver-source" + sourceUID = "1234-5678-90" + testNS = "testnamespace" + + sinkName = "testsink" +) + +func init() { + // Add types to scheme + _ = appsv1.AddToScheme(scheme.Scheme) + _ = corev1.AddToScheme(scheme.Scheme) + _ = duckv1alpha1.AddToScheme(scheme.Scheme) + + _ = os.Setenv("APISERVER_RA_IMAGE", image) +} + +func TestReconcile(t *testing.T) { + table := TableTest{ + { + Name: "missing sink", + Objects: []runtime.Object{ + NewApiServerSource(sourceName, testNS, + WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ + Sink: &sinkRef, + }), + ), + }, + Key: testNS + "/" + sourceName, + WantErr: true, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewApiServerSource(sourceName, testNS, + WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ + Sink: &sinkRef, + }), + // Status Update: + WithInitApiServerSourceConditions, + WithApiServerSourceSinkNotFound, + ), + }}, + }, + { + Name: "valid", + Objects: []runtime.Object{ + NewApiServerSource(sourceName, testNS, + WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ + Resources: []sourcesv1alpha1.ApiServerResource{ + sourcesv1alpha1.ApiServerResource{ + APIVersion: "", + Kind: "Namespace", + }, + }, + Sink: &sinkRef, + }), + ), + NewChannel(sinkName, testNS, + WithInitChannelConditions, + WithChannelAddress(sinkDNS), + ), + }, + Key: testNS + "/" + sourceName, + WantEvents: []string{ + Eventf(corev1.EventTypeNormal, "ApiServerSourceReconciled", `ApiServerSource reconciled: "%s/%s"`, testNS, sourceName), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewApiServerSource(sourceName, testNS, + WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ + Resources: []sourcesv1alpha1.ApiServerResource{ + sourcesv1alpha1.ApiServerResource{ + APIVersion: "", + Kind: "Namespace", + }, + }, + Sink: &sinkRef, + }), + // Status Update: + WithInitApiServerSourceConditions, + WithApiServerSourceDeployed, + WithApiServerSourceSink(sinkURI), + ), + }}, + WantCreates: []metav1.Object{ + makeReceiveAdapter(), + }, + }, + } + + defer logtesting.ClearAll() + table.Test(t, MakeFactory(func(listers *Listers, opt reconciler.Options) controller.Reconciler { + return &Reconciler{ + Base: reconciler.NewBase(opt, controllerAgentName), + apiserversourceLister: listers.GetApiServerSourceLister(), + deploymentLister: listers.GetDeploymentLister(), + } + })) +} +func TestNew(t *testing.T) { + defer logtesting.ClearAll() + kubeClient := fakekubeclientset.NewSimpleClientset() + eventingClient := fakeclientset.NewSimpleClientset() + eventingInformer := informers.NewSharedInformerFactory(eventingClient, 0) + kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + + apiserverInformer := eventingInformer.Sources().V1alpha1().ApiServerSources() + deploymentInformer := kubeInformer.Apps().V1().Deployments() + + c := NewController(reconciler.Options{ + KubeClientSet: kubeClient, + EventingClientSet: eventingClient, + Logger: logtesting.TestLogger(t), + }, + apiserverInformer, + deploymentInformer, + ) + + if c == nil { + t.Fatal("Expected NewController to return a non-nil value") + } +} + +func makeReceiveAdapter() *appsv1.Deployment { + source := NewApiServerSource(sourceName, testNS, + WithApiServerSourceSpec(sourcesv1alpha1.ApiServerSourceSpec{ + Resources: []sourcesv1alpha1.ApiServerResource{ + sourcesv1alpha1.ApiServerResource{ + APIVersion: "", + Kind: "Namespace", + }, + }, + Sink: &sinkRef, + }, + ), + // Status Update: + WithInitApiServerSourceConditions, + WithApiServerSourceDeployed, + WithApiServerSourceSink(sinkURI), + ) + + args := resources.ReceiveAdapterArgs{ + Image: image, + Source: source, + Labels: resources.Labels(sourceName), + SinkURI: sinkURI, + } + return resources.MakeReceiveAdapter(&args) +} diff --git a/pkg/reconciler/apiserversource/doc.go b/pkg/reconciler/apiserversource/doc.go new file mode 100644 index 00000000000..02638d2eb7c --- /dev/null +++ b/pkg/reconciler/apiserversource/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package apiserversource implements the ApiSource controller. +package apiserversource diff --git a/pkg/reconciler/apiserversource/resources/labels.go b/pkg/reconciler/apiserversource/resources/labels.go new file mode 100644 index 00000000000..1bcb28668e2 --- /dev/null +++ b/pkg/reconciler/apiserversource/resources/labels.go @@ -0,0 +1,30 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "apiserver-source-controller" +) + +func Labels(name string) map[string]string { + return map[string]string{ + "knative-eventing-source": controllerAgentName, + "knative-eventing-source-name": name, + } +} diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go new file mode 100644 index 00000000000..24cbadce897 --- /dev/null +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -0,0 +1,117 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// ReceiveAdapterArgs are the arguments needed to create a ApiServer Receive Adapter. +// Every field is required. +type ReceiveAdapterArgs struct { + Image string + Source *v1alpha1.ApiServerSource + Labels map[string]string + SinkURI string +} + +// MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for +// ApiServer Sources. +func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { + replicas := int32(1) + return &v1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: args.Source.Namespace, + GenerateName: fmt.Sprintf("apiserver-%s-", args.Source.Name), + Labels: args.Labels, + }, + Spec: v1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: args.Labels, + }, + Replicas: &replicas, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "sidecar.istio.io/inject": "true", + }, + Labels: args.Labels, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: args.Source.Spec.ServiceAccountName, + Containers: []corev1.Container{ + { + Name: "receive-adapter", + Image: args.Image, + Env: makeEnv(args.SinkURI, &args.Source.Spec), + }, + }, + }, + }, + }, + } +} + +func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar { + apiversions := "" + kinds := "" + controlled := "" + sep := "" + + for _, res := range spec.Resources { + apiversions += sep + res.APIVersion + kinds += sep + res.Kind + if res.Controller { + controlled += sep + "true" + } else { + controlled += sep + "false" + } + sep = "," + } + + return []corev1.EnvVar{ + { + Name: "SINK_URI", + Value: sinkURI, + }, + { + Name: "API_VERSION", + Value: apiversions, + }, + { + Name: "KIND", + Value: kinds, + }, + { + Name: "CONTROLLER", + Value: controlled, + }, + { + Name: "SYSTEM_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + } +} diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go new file mode 100644 index 00000000000..1ad126b3c61 --- /dev/null +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -0,0 +1,127 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" + v1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestMakeReceiveAdapter(t *testing.T) { + src := &v1alpha1.ApiServerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "source-name", + Namespace: "source-namespace", + }, + Spec: v1alpha1.ApiServerSourceSpec{ + ServiceAccountName: "source-svc-acct", + Resources: []v1alpha1.ApiServerResource{ + v1alpha1.ApiServerResource{ + APIVersion: "", + Kind: "Namespace", + }, + v1alpha1.ApiServerResource{ + APIVersion: "", + Kind: "Pod", + Controller: true, + }, + }, + }, + } + + got := MakeReceiveAdapter(&ReceiveAdapterArgs{ + Image: "test-image", + Source: src, + Labels: map[string]string{ + "test-key1": "test-value1", + "test-key2": "test-value2", + }, + SinkURI: "sink-uri", + }) + + one := int32(1) + want := &v1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "source-namespace", + GenerateName: "apiserver-source-name-", + Labels: map[string]string{ + "test-key1": "test-value1", + "test-key2": "test-value2", + }, + }, + Spec: v1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test-key1": "test-value1", + "test-key2": "test-value2", + }, + }, + Replicas: &one, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + "sidecar.istio.io/inject": "true", + }, + Labels: map[string]string{ + "test-key1": "test-value1", + "test-key2": "test-value2", + }, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: "source-svc-acct", + Containers: []corev1.Container{ + { + Name: "receive-adapter", + Image: "test-image", + Env: []corev1.EnvVar{ + { + Name: "SINK_URI", + Value: "sink-uri", + }, { + Name: "API_VERSION", + Value: ",", + }, { + Name: "KIND", + Value: "Namespace,Pod", + }, { + Name: "CONTROLLER", + Value: "false,true", + }, { + Name: "SYSTEM_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected deploy (-want, +got) = %v", diff) + } +} diff --git a/pkg/reconciler/testing/apiserversource.go b/pkg/reconciler/testing/apiserversource.go new file mode 100644 index 00000000000..158315a3085 --- /dev/null +++ b/pkg/reconciler/testing/apiserversource.go @@ -0,0 +1,73 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/knative/eventing/pkg/apis/sources/v1alpha1" +) + +// ApiServerSourceOption enables further configuration of a ApiServer. +type ApiServerSourceOption func(*v1alpha1.ApiServerSource) + +// NewApiServerSource creates a ApiServer with ApiServerOptions +func NewApiServerSource(name, namespace string, o ...ApiServerSourceOption) *v1alpha1.ApiServerSource { + c := &v1alpha1.ApiServerSource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, opt := range o { + opt(c) + } + //c.SetDefaults(context.Background()) // TODO: We should add defaults and validation. + return c +} + +// WithInitApiServerConditions initializes the ApiServerSource's conditions. +func WithInitApiServerSourceConditions(s *v1alpha1.ApiServerSource) { + s.Status.InitializeConditions() +} + +func WithApiServerSourceSinkNotFound(s *v1alpha1.ApiServerSource) { + s.Status.MarkNoSink("NotFound", "") +} + +func WithApiServerSourceSink(uri string) ApiServerSourceOption { + return func(s *v1alpha1.ApiServerSource) { + s.Status.MarkSink(uri) + } +} + +func WithApiServerSourceDeployed(s *v1alpha1.ApiServerSource) { + s.Status.MarkDeployed() +} + +func WithApiServerSourceDeleted(c *v1alpha1.ApiServerSource) { + t := metav1.NewTime(time.Unix(1e9, 0)) + c.ObjectMeta.SetDeletionTimestamp(&t) +} + +func WithApiServerSourceSpec(spec v1alpha1.ApiServerSourceSpec) ApiServerSourceOption { + return func(c *v1alpha1.ApiServerSource) { + c.Spec = spec + } +} diff --git a/pkg/reconciler/testing/listers.go b/pkg/reconciler/testing/listers.go index 1f41a286e5f..7f7b896b6ab 100644 --- a/pkg/reconciler/testing/listers.go +++ b/pkg/reconciler/testing/listers.go @@ -126,6 +126,10 @@ func (l *Listers) GetCronJobSourceLister() sourcelisters.CronJobSourceLister { return sourcelisters.NewCronJobSourceLister(l.indexerFor(&sourcesv1alpha1.CronJobSource{})) } +func (l *Listers) GetApiServerSourceLister() sourcelisters.ApiServerSourceLister { + return sourcelisters.NewApiServerSourceLister(l.indexerFor(&sourcesv1alpha1.ApiServerSource{})) +} + func (l *Listers) GetContainerSourceLister() sourcelisters.ContainerSourceLister { return sourcelisters.NewContainerSourceLister(l.indexerFor(&sourcesv1alpha1.ContainerSource{})) } diff --git a/third_party/VENDOR-LICENSE b/third_party/VENDOR-LICENSE index dcf5fffc254..2a4f2a03a7b 100644 --- a/third_party/VENDOR-LICENSE +++ b/third_party/VENDOR-LICENSE @@ -3454,6 +3454,31 @@ SOFTWARE. +=========================================================== +Import: github.com/knative/eventing/vendor/github.com/kelseyhightower/envconfig + +Copyright (c) 2013 Kelsey Hightower + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + + + =========================================================== Import: github.com/knative/eventing/vendor/github.com/knative/pkg diff --git a/vendor/github.com/kelseyhightower/envconfig/LICENSE b/vendor/github.com/kelseyhightower/envconfig/LICENSE new file mode 100644 index 00000000000..4bfa7a84d81 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2013 Kelsey Hightower + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies +of the Software, and to permit persons to whom the Software is furnished to do +so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/kelseyhightower/envconfig/doc.go b/vendor/github.com/kelseyhightower/envconfig/doc.go new file mode 100644 index 00000000000..f28561cd1cb --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/doc.go @@ -0,0 +1,8 @@ +// Copyright (c) 2013 Kelsey Hightower. All rights reserved. +// Use of this source code is governed by the MIT License that can be found in +// the LICENSE file. + +// Package envconfig implements decoding of environment variables based on a user +// defined specification. A typical use is using environment variables for +// configuration settings. +package envconfig diff --git a/vendor/github.com/kelseyhightower/envconfig/env_os.go b/vendor/github.com/kelseyhightower/envconfig/env_os.go new file mode 100644 index 00000000000..a6a014a2b47 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/env_os.go @@ -0,0 +1,7 @@ +// +build appengine + +package envconfig + +import "os" + +var lookupEnv = os.LookupEnv diff --git a/vendor/github.com/kelseyhightower/envconfig/env_syscall.go b/vendor/github.com/kelseyhightower/envconfig/env_syscall.go new file mode 100644 index 00000000000..9d98085b99f --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/env_syscall.go @@ -0,0 +1,7 @@ +// +build !appengine + +package envconfig + +import "syscall" + +var lookupEnv = syscall.Getenv diff --git a/vendor/github.com/kelseyhightower/envconfig/envconfig.go b/vendor/github.com/kelseyhightower/envconfig/envconfig.go new file mode 100644 index 00000000000..892d74699f6 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/envconfig.go @@ -0,0 +1,319 @@ +// Copyright (c) 2013 Kelsey Hightower. All rights reserved. +// Use of this source code is governed by the MIT License that can be found in +// the LICENSE file. + +package envconfig + +import ( + "encoding" + "errors" + "fmt" + "reflect" + "regexp" + "strconv" + "strings" + "time" +) + +// ErrInvalidSpecification indicates that a specification is of the wrong type. +var ErrInvalidSpecification = errors.New("specification must be a struct pointer") + +// A ParseError occurs when an environment variable cannot be converted to +// the type required by a struct field during assignment. +type ParseError struct { + KeyName string + FieldName string + TypeName string + Value string + Err error +} + +// Decoder has the same semantics as Setter, but takes higher precedence. +// It is provided for historical compatibility. +type Decoder interface { + Decode(value string) error +} + +// Setter is implemented by types can self-deserialize values. +// Any type that implements flag.Value also implements Setter. +type Setter interface { + Set(value string) error +} + +func (e *ParseError) Error() string { + return fmt.Sprintf("envconfig.Process: assigning %[1]s to %[2]s: converting '%[3]s' to type %[4]s. details: %[5]s", e.KeyName, e.FieldName, e.Value, e.TypeName, e.Err) +} + +// varInfo maintains information about the configuration variable +type varInfo struct { + Name string + Alt string + Key string + Field reflect.Value + Tags reflect.StructTag +} + +// GatherInfo gathers information about the specified struct +func gatherInfo(prefix string, spec interface{}) ([]varInfo, error) { + expr := regexp.MustCompile("([^A-Z]+|[A-Z][^A-Z]+|[A-Z]+)") + s := reflect.ValueOf(spec) + + if s.Kind() != reflect.Ptr { + return nil, ErrInvalidSpecification + } + s = s.Elem() + if s.Kind() != reflect.Struct { + return nil, ErrInvalidSpecification + } + typeOfSpec := s.Type() + + // over allocate an info array, we will extend if needed later + infos := make([]varInfo, 0, s.NumField()) + for i := 0; i < s.NumField(); i++ { + f := s.Field(i) + ftype := typeOfSpec.Field(i) + if !f.CanSet() || ftype.Tag.Get("ignored") == "true" { + continue + } + + for f.Kind() == reflect.Ptr { + if f.IsNil() { + if f.Type().Elem().Kind() != reflect.Struct { + // nil pointer to a non-struct: leave it alone + break + } + // nil pointer to struct: create a zero instance + f.Set(reflect.New(f.Type().Elem())) + } + f = f.Elem() + } + + // Capture information about the config variable + info := varInfo{ + Name: ftype.Name, + Field: f, + Tags: ftype.Tag, + Alt: strings.ToUpper(ftype.Tag.Get("envconfig")), + } + + // Default to the field name as the env var name (will be upcased) + info.Key = info.Name + + // Best effort to un-pick camel casing as separate words + if ftype.Tag.Get("split_words") == "true" { + words := expr.FindAllStringSubmatch(ftype.Name, -1) + if len(words) > 0 { + var name []string + for _, words := range words { + name = append(name, words[0]) + } + + info.Key = strings.Join(name, "_") + } + } + if info.Alt != "" { + info.Key = info.Alt + } + if prefix != "" { + info.Key = fmt.Sprintf("%s_%s", prefix, info.Key) + } + info.Key = strings.ToUpper(info.Key) + infos = append(infos, info) + + if f.Kind() == reflect.Struct { + // honor Decode if present + if decoderFrom(f) == nil && setterFrom(f) == nil && textUnmarshaler(f) == nil { + innerPrefix := prefix + if !ftype.Anonymous { + innerPrefix = info.Key + } + + embeddedPtr := f.Addr().Interface() + embeddedInfos, err := gatherInfo(innerPrefix, embeddedPtr) + if err != nil { + return nil, err + } + infos = append(infos[:len(infos)-1], embeddedInfos...) + + continue + } + } + } + return infos, nil +} + +// Process populates the specified struct based on environment variables +func Process(prefix string, spec interface{}) error { + infos, err := gatherInfo(prefix, spec) + + for _, info := range infos { + + // `os.Getenv` cannot differentiate between an explicitly set empty value + // and an unset value. `os.LookupEnv` is preferred to `syscall.Getenv`, + // but it is only available in go1.5 or newer. We're using Go build tags + // here to use os.LookupEnv for >=go1.5 + value, ok := lookupEnv(info.Key) + if !ok && info.Alt != "" { + value, ok = lookupEnv(info.Alt) + } + + def := info.Tags.Get("default") + if def != "" && !ok { + value = def + } + + req := info.Tags.Get("required") + if !ok && def == "" { + if req == "true" { + return fmt.Errorf("required key %s missing value", info.Key) + } + continue + } + + err := processField(value, info.Field) + if err != nil { + return &ParseError{ + KeyName: info.Key, + FieldName: info.Name, + TypeName: info.Field.Type().String(), + Value: value, + Err: err, + } + } + } + + return err +} + +// MustProcess is the same as Process but panics if an error occurs +func MustProcess(prefix string, spec interface{}) { + if err := Process(prefix, spec); err != nil { + panic(err) + } +} + +func processField(value string, field reflect.Value) error { + typ := field.Type() + + decoder := decoderFrom(field) + if decoder != nil { + return decoder.Decode(value) + } + // look for Set method if Decode not defined + setter := setterFrom(field) + if setter != nil { + return setter.Set(value) + } + + if t := textUnmarshaler(field); t != nil { + return t.UnmarshalText([]byte(value)) + } + + if typ.Kind() == reflect.Ptr { + typ = typ.Elem() + if field.IsNil() { + field.Set(reflect.New(typ)) + } + field = field.Elem() + } + + switch typ.Kind() { + case reflect.String: + field.SetString(value) + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + var ( + val int64 + err error + ) + if field.Kind() == reflect.Int64 && typ.PkgPath() == "time" && typ.Name() == "Duration" { + var d time.Duration + d, err = time.ParseDuration(value) + val = int64(d) + } else { + val, err = strconv.ParseInt(value, 0, typ.Bits()) + } + if err != nil { + return err + } + + field.SetInt(val) + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + val, err := strconv.ParseUint(value, 0, typ.Bits()) + if err != nil { + return err + } + field.SetUint(val) + case reflect.Bool: + val, err := strconv.ParseBool(value) + if err != nil { + return err + } + field.SetBool(val) + case reflect.Float32, reflect.Float64: + val, err := strconv.ParseFloat(value, typ.Bits()) + if err != nil { + return err + } + field.SetFloat(val) + case reflect.Slice: + vals := strings.Split(value, ",") + sl := reflect.MakeSlice(typ, len(vals), len(vals)) + for i, val := range vals { + err := processField(val, sl.Index(i)) + if err != nil { + return err + } + } + field.Set(sl) + case reflect.Map: + pairs := strings.Split(value, ",") + mp := reflect.MakeMap(typ) + for _, pair := range pairs { + kvpair := strings.Split(pair, ":") + if len(kvpair) != 2 { + return fmt.Errorf("invalid map item: %q", pair) + } + k := reflect.New(typ.Key()).Elem() + err := processField(kvpair[0], k) + if err != nil { + return err + } + v := reflect.New(typ.Elem()).Elem() + err = processField(kvpair[1], v) + if err != nil { + return err + } + mp.SetMapIndex(k, v) + } + field.Set(mp) + } + + return nil +} + +func interfaceFrom(field reflect.Value, fn func(interface{}, *bool)) { + // it may be impossible for a struct field to fail this check + if !field.CanInterface() { + return + } + var ok bool + fn(field.Interface(), &ok) + if !ok && field.CanAddr() { + fn(field.Addr().Interface(), &ok) + } +} + +func decoderFrom(field reflect.Value) (d Decoder) { + interfaceFrom(field, func(v interface{}, ok *bool) { d, *ok = v.(Decoder) }) + return d +} + +func setterFrom(field reflect.Value) (s Setter) { + interfaceFrom(field, func(v interface{}, ok *bool) { s, *ok = v.(Setter) }) + return s +} + +func textUnmarshaler(field reflect.Value) (t encoding.TextUnmarshaler) { + interfaceFrom(field, func(v interface{}, ok *bool) { t, *ok = v.(encoding.TextUnmarshaler) }) + return t +} diff --git a/vendor/github.com/kelseyhightower/envconfig/usage.go b/vendor/github.com/kelseyhightower/envconfig/usage.go new file mode 100644 index 00000000000..184635380f2 --- /dev/null +++ b/vendor/github.com/kelseyhightower/envconfig/usage.go @@ -0,0 +1,158 @@ +// Copyright (c) 2016 Kelsey Hightower and others. All rights reserved. +// Use of this source code is governed by the MIT License that can be found in +// the LICENSE file. + +package envconfig + +import ( + "encoding" + "fmt" + "io" + "os" + "reflect" + "strconv" + "strings" + "text/tabwriter" + "text/template" +) + +const ( + // DefaultListFormat constant to use to display usage in a list format + DefaultListFormat = `This application is configured via the environment. The following environment +variables can be used: +{{range .}} +{{usage_key .}} + [description] {{usage_description .}} + [type] {{usage_type .}} + [default] {{usage_default .}} + [required] {{usage_required .}}{{end}} +` + // DefaultTableFormat constant to use to display usage in a tabluar format + DefaultTableFormat = `This application is configured via the environment. The following environment +variables can be used: + +KEY TYPE DEFAULT REQUIRED DESCRIPTION +{{range .}}{{usage_key .}} {{usage_type .}} {{usage_default .}} {{usage_required .}} {{usage_description .}} +{{end}}` +) + +var ( + decoderType = reflect.TypeOf((*Decoder)(nil)).Elem() + setterType = reflect.TypeOf((*Setter)(nil)).Elem() + unmarshalerType = reflect.TypeOf((*encoding.TextUnmarshaler)(nil)).Elem() +) + +func implementsInterface(t reflect.Type) bool { + return t.Implements(decoderType) || + reflect.PtrTo(t).Implements(decoderType) || + t.Implements(setterType) || + reflect.PtrTo(t).Implements(setterType) || + t.Implements(unmarshalerType) || + reflect.PtrTo(t).Implements(unmarshalerType) +} + +// toTypeDescription converts Go types into a human readable description +func toTypeDescription(t reflect.Type) string { + switch t.Kind() { + case reflect.Array, reflect.Slice: + return fmt.Sprintf("Comma-separated list of %s", toTypeDescription(t.Elem())) + case reflect.Map: + return fmt.Sprintf( + "Comma-separated list of %s:%s pairs", + toTypeDescription(t.Key()), + toTypeDescription(t.Elem()), + ) + case reflect.Ptr: + return toTypeDescription(t.Elem()) + case reflect.Struct: + if implementsInterface(t) && t.Name() != "" { + return t.Name() + } + return "" + case reflect.String: + name := t.Name() + if name != "" && name != "string" { + return name + } + return "String" + case reflect.Bool: + name := t.Name() + if name != "" && name != "bool" { + return name + } + return "True or False" + case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: + name := t.Name() + if name != "" && !strings.HasPrefix(name, "int") { + return name + } + return "Integer" + case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: + name := t.Name() + if name != "" && !strings.HasPrefix(name, "uint") { + return name + } + return "Unsigned Integer" + case reflect.Float32, reflect.Float64: + name := t.Name() + if name != "" && !strings.HasPrefix(name, "float") { + return name + } + return "Float" + } + return fmt.Sprintf("%+v", t) +} + +// Usage writes usage information to stderr using the default header and table format +func Usage(prefix string, spec interface{}) error { + // The default is to output the usage information as a table + // Create tabwriter instance to support table output + tabs := tabwriter.NewWriter(os.Stdout, 1, 0, 4, ' ', 0) + + err := Usagef(prefix, spec, tabs, DefaultTableFormat) + tabs.Flush() + return err +} + +// Usagef writes usage information to the specified io.Writer using the specifed template specification +func Usagef(prefix string, spec interface{}, out io.Writer, format string) error { + + // Specify the default usage template functions + functions := template.FuncMap{ + "usage_key": func(v varInfo) string { return v.Key }, + "usage_description": func(v varInfo) string { return v.Tags.Get("desc") }, + "usage_type": func(v varInfo) string { return toTypeDescription(v.Field.Type()) }, + "usage_default": func(v varInfo) string { return v.Tags.Get("default") }, + "usage_required": func(v varInfo) (string, error) { + req := v.Tags.Get("required") + if req != "" { + reqB, err := strconv.ParseBool(req) + if err != nil { + return "", err + } + if reqB { + req = "true" + } + } + return req, nil + }, + } + + tmpl, err := template.New("envconfig").Funcs(functions).Parse(format) + if err != nil { + return err + } + + return Usaget(prefix, spec, out, tmpl) +} + +// Usaget writes usage information to the specified io.Writer using the specified template +func Usaget(prefix string, spec interface{}, out io.Writer, tmpl *template.Template) error { + // gather first + infos, err := gatherInfo(prefix, spec) + if err != nil { + return err + } + + return tmpl.Execute(out, infos) +}