From b0e4bb1b6cde0fe23054a87aad377ba0047e1705 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 6 May 2019 16:14:01 -0700 Subject: [PATCH 01/16] working through using the informer directly. --- cmd/apiserver_receive_adapter/main.go | 62 ++++----- pkg/adapter/apiserver/adapter.go | 188 +++++++++++++++++++++++++- sample/apiserver_pod.yaml | 46 +++++++ 3 files changed, 257 insertions(+), 39 deletions(-) create mode 100644 sample/apiserver_pod.yaml diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index e7bc7004eed..3b5f10cdcbe 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -18,21 +18,12 @@ package main import ( "flag" - "time" - - "k8s.io/client-go/rest" - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "github.com/knative/eventing/pkg/reconciler" - "github.com/kelseyhightower/envconfig" "github.com/knative/eventing/pkg/adapter/apiserver" "github.com/knative/eventing/pkg/kncloudevents" - "github.com/knative/pkg/apis/duck" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - kncontroller "github.com/knative/pkg/controller" "github.com/knative/pkg/signals" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -48,12 +39,15 @@ var ( ) type envConfig struct { + Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"` SinkURI string `split_words:"true" required:"true"` ApiVersion []string `split_words:"true" required:"true"` Kind []string `required:"true"` Controller []bool `required:"true"` } +// TODO: the controller should make the list of GVR + func main() { flag.Parse() @@ -79,10 +73,10 @@ func main() { logger = logger.With(zap.String("controller/apiserver", "adapter")) logger.Info("Starting the controller") - numControllers := len(env.ApiVersion) - cfg.QPS = float32(numControllers) * rest.DefaultQPS - cfg.Burst = numControllers * rest.DefaultBurst - opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) + //numControllers := len(env.ApiVersion) + //cfg.QPS = float32(numControllers) * rest.DefaultQPS + //cfg.Burst = numControllers * rest.DefaultBurst + //opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) client, err := dynamic.NewForConfig(cfg) if err != nil { @@ -94,21 +88,20 @@ func main() { logger.Fatalw("Error building cloud event client", zap.Error(err)) } - controllers := []*kncontroller.Impl{} + gvrs := []schema.GroupVersionResource(nil) - // Create one controller per resource. for i, apiVersion := range env.ApiVersion { kind := env.Kind[i] - controlled := env.Controller[i] + // controlled := env.Controller[i] - obj := &duckv1alpha1.AddressableType{} + // obj := &duckv1alpha1.AddressableType{} - factory := duck.TypedInformerFactory{ - Client: client, - ResyncPeriod: time.Duration(10), // TODO - StopChannel: stopCh, - Type: obj, - } + //factory := duck.TypedInformerFactory{ + // Client: client, + // ResyncPeriod: time.Duration(10 * time.Hour), + // StopChannel: stopCh, + // Type: obj, + //} gv, err := schema.ParseGroupVersion(apiVersion) if err != nil { @@ -120,17 +113,20 @@ func main() { // This is really bad. gvr, _ := meta.UnsafeGuessKindToResource(gvk) + gvrs = append(gvrs, gvr) + // Get and start the informer for gvr - logger.Infof("Starting informer for %v", gvk) - informer, lister, err := factory.Get(gvr) - if err != nil { - logger.Fatalw("Error starting informer", zap.Error(err)) - } - controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled)) + //logger.Infof("Starting informer for %v", gvk) + //informer, lister, err := factory.Get(gvr) + //if err != nil { + // logger.Fatalw("Error starting informer", zap.Error(err)) + //} + //controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled)) } - // Start all of the controllers. - logger.Info("Starting controllers.") - go kncontroller.StartAll(stopCh, controllers...) - <-stopCh + a := apiserver.NewAdaptor("this_source", client, eventsClient, logger, gvrs...) + logger.Info("starting kubernetes api adapter") + if err := a.Start(stopCh); err != nil { + logger.Warn("start returned an error,", zap.Error(err)) + } } diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index db14f450645..22a3006b7ec 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -18,9 +18,14 @@ package apiserver import ( "context" + "fmt" + "github.com/knative/pkg/apis/duck" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "strings" + "time" - "github.com/cloudevents/sdk-go/pkg/cloudevents" - eventsclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client" + cloudevents "github.com/cloudevents/sdk-go" "github.com/cloudevents/sdk-go/pkg/cloudevents/types" "github.com/knative/eventing/pkg/reconciler" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" @@ -36,17 +41,188 @@ const ( ReconcilerName = "ApiServerSource" controllerAgentName = "apiserver-source-adapter-controller" - updateEventType = "dev.knative.apiserver.object.update" - deleteEventType = "dev.knative.apiserver.object.delete" + + addEventType = "dev.knative.apiserver.object.add" + updateEventType = "dev.knative.apiserver.object.update" + deleteEventType = "dev.knative.apiserver.object.delete" ) +type KubernetesEvent struct { + Object *corev1.Event `json:"obj,omitempty"` + NewObject *corev1.Event `json:"newObj,omitempty"` + OldObject *corev1.Event `json:"oldObj,omitempty"` +} + +// Creates a URI of the form found in object metadata selfLinks +// Format looks like: /apis/feeds.knative.dev/v1alpha1/namespaces/default/feeds/k8s-events-example +// KNOWN ISSUES: +// * ObjectReference.APIVersion has no version information (e.g. serving.knative.dev rather than serving.knative.dev/v1alpha1) +// * ObjectReference does not have enough information to create the pluaralized list type (e.g. "revisions" from kind: Revision) +// +// Track these issues at https://github.com/kubernetes/kubernetes/issues/66313 +// We could possibly work around this by adding a lister for the resources referenced by these events. +func createSelfLink(o corev1.ObjectReference) string { + collectionNameHack := strings.ToLower(o.Kind) + "s" + versionNameHack := o.APIVersion + + // Core API types don't have a separate package name and only have a version string (e.g. /apis/v1/namespaces/default/pods/myPod) + // To avoid weird looking strings like "v1/versionUnknown" we'll sniff for a "." in the version + if strings.Contains(versionNameHack, ".") && !strings.Contains(versionNameHack, "/") { + versionNameHack = versionNameHack + "/versionUnknown" + } + return fmt.Sprintf("/apis/%s/namespaces/%s/%s/%s", versionNameHack, o.Namespace, collectionNameHack, o.Name) +} + +/* + + + eventsInformer := coreinformers.NewFilteredEventInformer( + a.kubeClient, a.Namespace, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil) + + eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: a.addEvent, + UpdateFunc: a.updateEvent, + }) + + logger.Debug("Starting eventsInformer...") + stop := make(chan struct{}) + go eventsInformer.Run(stop) + + logger.Debug("waiting for caches to sync...") + if ok := cache.WaitForCacheSync(stopCh, eventsInformer.HasSynced); !ok { + return fmt.Errorf("failed to wait for events cache to sync") + } + logger.Debug("caches synced...") + <-stopCh + stop <- struct{}{} + return nil + +*/ + +type Adapter interface { + Start(stopCh <-chan struct{}) error +} + +type adapter struct { + gvrs []schema.GroupVersionResource + + k8s dynamic.Interface + ce cloudevents.Client + source string + namespace string + logger *zap.SugaredLogger +} + +func NewAdaptor(source string, namespace string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, gvr ...schema.GroupVersionResource) Adapter { + a := &adapter{ + k8s: k8sClient, + ce: ceClient, + source: source, + namespace: namespace, + gvrs: gvr, + logger: logger, + } + return a +} + +/* +TODO: No longer sending events for the controller of the updated object, a al: + + if controlled { + informer.AddEventHandler(reconciler.Handler(impl.EnqueueControllerOf)) + } else { + informer.AddEventHandler(reconciler.Handler(impl.Enqueue)) + } + +*/ + +func (a *adapter) Start(stopCh <-chan struct{}) error { + + // TODO: the current duck informer has no namespace. fix that. + + // Local stop channel. + stop := make(chan struct{}) + + factory := duck.TypedInformerFactory{ + Client: a.k8s, + ResyncPeriod: time.Duration(10 * time.Hour), + StopChannel: stop, + Type: &duckv1alpha1.KResource{}, + } + + for _, gvr := range a.gvrs { + informer, _, err := factory.Get(gvr) + if err != nil { + return err + } + + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: a.addEvent, + UpdateFunc: a.updateEvent, + DeleteFunc: a.deleteEvent, + }) + } + + <-stopCh + stop <- struct{}{} + return nil +} + +func (a *adapter) addEvent(obj interface{}) { + objEvent := obj.(*corev1.Event) + + if err := a.send(addEventType, createSelfLink(objEvent.InvolvedObject), &KubernetesEvent{ + Object: objEvent, + }); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *adapter) updateEvent(oldObj, newObj interface{}) { + objEvent := newObj.(*corev1.Event) + + if err := a.send(updateEventType, createSelfLink(objEvent.InvolvedObject), &KubernetesEvent{ + NewObject: objEvent, + OldObject: oldObj.(*corev1.Event), + }); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *adapter) deleteEvent(obj interface{}) { + objEvent := obj.(*corev1.Event) + + if err := a.send(deleteEventType, createSelfLink(objEvent.InvolvedObject), &KubernetesEvent{ + Object: objEvent, + }); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *adapter) send(eventType, subject string, data *KubernetesEvent) error { + event := cloudevents.NewEvent() + event.SetType(eventType) + event.SetSource(a.source) + event.SetSubject(subject) + + if err := event.SetData(data); err != nil { + a.logger.Warn("failed to set event data for kubernetes event") + return err + } + + if _, err := a.ce.Send(context.TODO(), event); err != nil { + return err + } + return nil +} + // NewController initializes the controller and is called by the generated code // Registers event handlers to enqueue events func NewController( opt reconciler.Options, informer cache.SharedInformer, lister cache.GenericLister, - eventsclient eventsclient.Client, + eventsclient cloudevents.Client, controlled bool) *controller.Impl { r := &Reconciler{ @@ -70,7 +246,7 @@ func NewController( type Reconciler struct { *reconciler.Base - eventsClient eventsclient.Client + eventsClient cloudevents.Client lister cache.GenericLister } diff --git a/sample/apiserver_pod.yaml b/sample/apiserver_pod.yaml new file mode 100644 index 00000000000..b9eb1564256 --- /dev/null +++ b/sample/apiserver_pod.yaml @@ -0,0 +1,46 @@ +apiVersion: sources.eventing.knative.dev/v1alpha1 +kind: ApiServerSource +metadata: + name: api-server-source +spec: + serviceAccountName: events-sa + resources: + - apiVersion: v1 + kind: Pods + sink: + apiVersion: eventing.knative.dev/v1alpha1 + kind: Broker + name: default +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: events-sa + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: event-watcher +rules: + - apiGroups: + - v1 + resources: + - pods + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: k8s-ra-event-watcher +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: event-watcher +subjects: + - kind: ServiceAccount + name: events-sa + namespace: default \ No newline at end of file From 47b37760a8288fa28dbccd86aca4ff9b8180a467 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 7 May 2019 08:46:45 -0700 Subject: [PATCH 02/16] getting closer.: --- cmd/apiserver_receive_adapter/main.go | 2 +- pkg/adapter/apiserver/adapter.go | 47 +++++++++++++------ sample/apiserver_pod.yaml | 4 +- .../github.com/knative/pkg/apis/duck/typed.go | 22 +++++++++ 4 files changed, 57 insertions(+), 18 deletions(-) diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index 3b5f10cdcbe..d73007f38cf 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -124,7 +124,7 @@ func main() { //controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled)) } - a := apiserver.NewAdaptor("this_source", client, eventsClient, logger, gvrs...) + a := apiserver.NewAdaptor("this_source", env.Namespace, client, eventsClient, logger, gvrs...) logger.Info("starting kubernetes api adapter") if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 22a3006b7ec..db00a4c1713 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -48,9 +48,9 @@ const ( ) type KubernetesEvent struct { - Object *corev1.Event `json:"obj,omitempty"` - NewObject *corev1.Event `json:"newObj,omitempty"` - OldObject *corev1.Event `json:"oldObj,omitempty"` + Object *duckv1alpha1.KResource `json:"obj,omitempty"` + NewObject *duckv1alpha1.KResource `json:"newObj,omitempty"` + OldObject *duckv1alpha1.KResource `json:"oldObj,omitempty"` } // Creates a URI of the form found in object metadata selfLinks @@ -150,8 +150,18 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { Type: &duckv1alpha1.KResource{}, } + //dynamic.NamespaceableResourceInterface() + + //eventsInformer := coreinformers.NewFilteredEventInformer( + // a.kubeClient, a.Namespace, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil) + + //eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + // AddFunc: a.addEvent, + // UpdateFunc: a.updateEvent, + //}) + for _, gvr := range a.gvrs { - informer, _, err := factory.Get(gvr) + informer, _, err := factory.GetNamespaced(gvr, a.namespace) if err != nil { return err } @@ -169,37 +179,44 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { } func (a *adapter) addEvent(obj interface{}) { - objEvent := obj.(*corev1.Event) + object := obj.(*duckv1alpha1.KResource) - if err := a.send(addEventType, createSelfLink(objEvent.InvolvedObject), &KubernetesEvent{ - Object: objEvent, + if err := a.send(addEventType, object, &KubernetesEvent{ + Object: object, }); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) } } func (a *adapter) updateEvent(oldObj, newObj interface{}) { - objEvent := newObj.(*corev1.Event) + object := newObj.(*duckv1alpha1.KResource) - if err := a.send(updateEventType, createSelfLink(objEvent.InvolvedObject), &KubernetesEvent{ - NewObject: objEvent, - OldObject: oldObj.(*corev1.Event), + if err := a.send(updateEventType, object, &KubernetesEvent{ + NewObject: object, + OldObject: oldObj.(*duckv1alpha1.KResource), }); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) } } func (a *adapter) deleteEvent(obj interface{}) { - objEvent := obj.(*corev1.Event) + object := obj.(*duckv1alpha1.KResource) - if err := a.send(deleteEventType, createSelfLink(objEvent.InvolvedObject), &KubernetesEvent{ - Object: objEvent, + if err := a.send(deleteEventType, object, &KubernetesEvent{ + Object: object, }); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) } } -func (a *adapter) send(eventType, subject string, data *KubernetesEvent) error { +func (a *adapter) send(eventType string, obj *duckv1alpha1.KResource, data *KubernetesEvent) error { + subject := createSelfLink(corev1.ObjectReference{ + APIVersion: obj.APIVersion, + Kind: obj.Kind, + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }) + event := cloudevents.NewEvent() event.SetType(eventType) event.SetSource(a.source) diff --git a/sample/apiserver_pod.yaml b/sample/apiserver_pod.yaml index b9eb1564256..501c08e2d95 100644 --- a/sample/apiserver_pod.yaml +++ b/sample/apiserver_pod.yaml @@ -6,7 +6,7 @@ spec: serviceAccountName: events-sa resources: - apiVersion: v1 - kind: Pods + kind: Pod sink: apiVersion: eventing.knative.dev/v1alpha1 kind: Broker @@ -24,7 +24,7 @@ metadata: name: event-watcher rules: - apiGroups: - - v1 + - "" resources: - pods verbs: diff --git a/vendor/github.com/knative/pkg/apis/duck/typed.go b/vendor/github.com/knative/pkg/apis/duck/typed.go index 9d29c1e0bdc..92a0fbc560d 100644 --- a/vendor/github.com/knative/pkg/apis/duck/typed.go +++ b/vendor/github.com/knative/pkg/apis/duck/typed.go @@ -66,6 +66,28 @@ func (dif *TypedInformerFactory) Get(gvr schema.GroupVersionResource) (cache.Sha return inf, lister, nil } +// Get implements InformerFactory. +func (dif *TypedInformerFactory) GetNamespaced(gvr schema.GroupVersionResource, namespace string) (cache.SharedIndexInformer, cache.GenericNamespaceLister, error) { + listObj := dif.Type.GetListType() + lw := &cache.ListWatch{ + ListFunc: asStructuredLister(dif.Client.Resource(gvr).Namespace(namespace).List, listObj), + WatchFunc: AsStructuredWatcher(dif.Client.Resource(gvr).Namespace(namespace).Watch, dif.Type), + } + inf := cache.NewSharedIndexInformer(lw, dif.Type, dif.ResyncPeriod, cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + }) + + lister := cache.NewGenericLister(inf.GetIndexer(), gvr.GroupResource()).ByNamespace(namespace) + + go inf.Run(dif.StopChannel) + + if ok := cache.WaitForCacheSync(dif.StopChannel, inf.HasSynced); !ok { + return nil, nil, fmt.Errorf("failed starting shared index informer for %v with type %T on namespace %s", gvr, dif.Type, namespace) + } + + return inf, lister, nil +} + type unstructuredLister func(metav1.ListOptions) (*unstructured.UnstructuredList, error) func asStructuredLister(ulist unstructuredLister, listObj runtime.Object) cache.ListFunc { From bd8807691faef8d5180d2d8c1b3d5b089a7dfc27 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 7 May 2019 09:22:57 -0700 Subject: [PATCH 03/16] working. --- pkg/adapter/apiserver/adapter.go | 31 ++++++++---- .../resources/receive_adapter.go | 49 ++++++++++--------- 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index db00a4c1713..9bf16399d8f 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -104,23 +104,27 @@ type Adapter interface { } type adapter struct { - gvrs []schema.GroupVersionResource + APIVersion string + Kind string + gvrs []schema.GroupVersionResource k8s dynamic.Interface ce cloudevents.Client - source string + name string namespace string logger *zap.SugaredLogger } -func NewAdaptor(source string, namespace string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, gvr ...schema.GroupVersionResource) Adapter { +func NewAdaptor(name string, namespace string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, gvr ...schema.GroupVersionResource) Adapter { a := &adapter{ - k8s: k8sClient, - ce: ceClient, - source: source, - namespace: namespace, - gvrs: gvr, - logger: logger, + APIVersion: "v1", + Kind: "Pod", + k8s: k8sClient, + ce: ceClient, + name: name, + namespace: namespace, + gvrs: gvr, + logger: logger, } return a } @@ -217,9 +221,16 @@ func (a *adapter) send(eventType string, obj *duckv1alpha1.KResource, data *Kube Namespace: obj.GetNamespace(), }) + source := createSelfLink(corev1.ObjectReference{ + APIVersion: a.APIVersion, + Kind: a.Kind, + Name: a.name, + Namespace: a.namespace, + }) + event := cloudevents.NewEvent() event.SetType(eventType) - event.SetSource(a.source) + event.SetSource(source) event.SetSubject(subject) if err := event.SetData(data); err != nil { diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index 378e800f01b..3fcebdd214b 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -56,7 +56,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - "sidecar.istio.io/inject": "true", + "sidecar.istio.io/inject": "false", }, Labels: args.Labels, }, @@ -92,30 +92,31 @@ func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar sep = "," } - return []corev1.EnvVar{ - { - Name: "SINK_URI", - Value: sinkURI, - }, - { - Name: "API_VERSION", - Value: apiversions, - }, - { - Name: "KIND", - Value: kinds, - }, - { - Name: "CONTROLLER", - Value: controlled, + return []corev1.EnvVar{{ + Name: "SINK_URI", + Value: sinkURI, + }, { + Name: "API_VERSION", + Value: apiversions, + }, { + Name: "KIND", + Value: kinds, + }, { + Name: "CONTROLLER", + Value: controlled, + }, { + Name: "SYSTEM_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", + }, }, - { - Name: "SYSTEM_NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, + }, { + Name: "SYSTEM_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.name", }, }, - } + }} } From a9d7162cd0ae860788dc7b4ecfd89e6fed1e24fe Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 7 May 2019 10:23:07 -0700 Subject: [PATCH 04/16] cleanup. --- cmd/apiserver_receive_adapter/main.go | 33 +--- pkg/adapter/apiserver/adapter.go | 259 +++----------------------- pkg/adapter/apiserver/adapter_test.go | 186 +----------------- pkg/adapter/apiserver/event.go | 102 ++++++++++ 4 files changed, 135 insertions(+), 445 deletions(-) create mode 100644 pkg/adapter/apiserver/event.go diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index d73007f38cf..8557671f003 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -46,7 +46,7 @@ type envConfig struct { Controller []bool `required:"true"` } -// TODO: the controller should make the list of GVR +// TODO: the controller should take the list of GVR func main() { flag.Parse() @@ -73,11 +73,6 @@ func main() { logger = logger.With(zap.String("controller/apiserver", "adapter")) logger.Info("Starting the controller") - //numControllers := len(env.ApiVersion) - //cfg.QPS = float32(numControllers) * rest.DefaultQPS - //cfg.Burst = numControllers * rest.DefaultBurst - //opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) - client, err := dynamic.NewForConfig(cfg) if err != nil { logger.Fatalw("Error building dynamic client", zap.Error(err)) @@ -92,39 +87,17 @@ func main() { for i, apiVersion := range env.ApiVersion { kind := env.Kind[i] - // controlled := env.Controller[i] - - // obj := &duckv1alpha1.AddressableType{} - - //factory := duck.TypedInformerFactory{ - // Client: client, - // ResyncPeriod: time.Duration(10 * time.Hour), - // StopChannel: stopCh, - // Type: obj, - //} gv, err := schema.ParseGroupVersion(apiVersion) if err != nil { logger.Fatalw("Error parsing APIVersion", zap.Error(err)) } - - gvk := schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version} - // This is really bad. - gvr, _ := meta.UnsafeGuessKindToResource(gvk) - + gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}) gvrs = append(gvrs, gvr) - - // Get and start the informer for gvr - //logger.Infof("Starting informer for %v", gvk) - //informer, lister, err := factory.Get(gvr) - //if err != nil { - // logger.Fatalw("Error starting informer", zap.Error(err)) - //} - //controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled)) } - a := apiserver.NewAdaptor("this_source", env.Namespace, client, eventsClient, logger, gvrs...) + a := apiserver.NewAdaptor(cfg.Host, env.Namespace, client, eventsClient, logger, gvrs...) logger.Info("starting kubernetes api adapter") if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 9bf16399d8f..5e344f67918 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -18,113 +18,39 @@ package apiserver import ( "context" - "fmt" - "github.com/knative/pkg/apis/duck" + "time" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" - "strings" - "time" + "k8s.io/client-go/tools/cache" cloudevents "github.com/cloudevents/sdk-go" - "github.com/cloudevents/sdk-go/pkg/cloudevents/types" - "github.com/knative/eventing/pkg/reconciler" + "github.com/knative/pkg/apis/duck" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - "github.com/knative/pkg/controller" "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/client-go/tools/cache" -) - -const ( - // ReconcilerName is the name of the reconciler - ReconcilerName = "ApiServerSource" - - controllerAgentName = "apiserver-source-adapter-controller" - - addEventType = "dev.knative.apiserver.object.add" - updateEventType = "dev.knative.apiserver.object.update" - deleteEventType = "dev.knative.apiserver.object.delete" ) -type KubernetesEvent struct { - Object *duckv1alpha1.KResource `json:"obj,omitempty"` - NewObject *duckv1alpha1.KResource `json:"newObj,omitempty"` - OldObject *duckv1alpha1.KResource `json:"oldObj,omitempty"` -} - -// Creates a URI of the form found in object metadata selfLinks -// Format looks like: /apis/feeds.knative.dev/v1alpha1/namespaces/default/feeds/k8s-events-example -// KNOWN ISSUES: -// * ObjectReference.APIVersion has no version information (e.g. serving.knative.dev rather than serving.knative.dev/v1alpha1) -// * ObjectReference does not have enough information to create the pluaralized list type (e.g. "revisions" from kind: Revision) -// -// Track these issues at https://github.com/kubernetes/kubernetes/issues/66313 -// We could possibly work around this by adding a lister for the resources referenced by these events. -func createSelfLink(o corev1.ObjectReference) string { - collectionNameHack := strings.ToLower(o.Kind) + "s" - versionNameHack := o.APIVersion - - // Core API types don't have a separate package name and only have a version string (e.g. /apis/v1/namespaces/default/pods/myPod) - // To avoid weird looking strings like "v1/versionUnknown" we'll sniff for a "." in the version - if strings.Contains(versionNameHack, ".") && !strings.Contains(versionNameHack, "/") { - versionNameHack = versionNameHack + "/versionUnknown" - } - return fmt.Sprintf("/apis/%s/namespaces/%s/%s/%s", versionNameHack, o.Namespace, collectionNameHack, o.Name) -} - -/* - - - eventsInformer := coreinformers.NewFilteredEventInformer( - a.kubeClient, a.Namespace, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil) - - eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: a.addEvent, - UpdateFunc: a.updateEvent, - }) - - logger.Debug("Starting eventsInformer...") - stop := make(chan struct{}) - go eventsInformer.Run(stop) - - logger.Debug("waiting for caches to sync...") - if ok := cache.WaitForCacheSync(stopCh, eventsInformer.HasSynced); !ok { - return fmt.Errorf("failed to wait for events cache to sync") - } - logger.Debug("caches synced...") - <-stopCh - stop <- struct{}{} - return nil - -*/ - type Adapter interface { Start(stopCh <-chan struct{}) error } type adapter struct { - APIVersion string - Kind string - gvrs []schema.GroupVersionResource k8s dynamic.Interface ce cloudevents.Client - name string + source string namespace string logger *zap.SugaredLogger } -func NewAdaptor(name string, namespace string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, gvr ...schema.GroupVersionResource) Adapter { +func NewAdaptor(source, namespace string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, gvr ...schema.GroupVersionResource) Adapter { a := &adapter{ - APIVersion: "v1", - Kind: "Pod", - k8s: k8sClient, - ce: ceClient, - name: name, - namespace: namespace, - gvrs: gvr, - logger: logger, + k8s: k8sClient, + ce: ceClient, + source: source, + namespace: namespace, + gvrs: gvr, + logger: logger, } return a } @@ -141,9 +67,6 @@ TODO: No longer sending events for the controller of the updated object, a al: */ func (a *adapter) Start(stopCh <-chan struct{}) error { - - // TODO: the current duck informer has no namespace. fix that. - // Local stop channel. stop := make(chan struct{}) @@ -154,16 +77,6 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { Type: &duckv1alpha1.KResource{}, } - //dynamic.NamespaceableResourceInterface() - - //eventsInformer := coreinformers.NewFilteredEventInformer( - // a.kubeClient, a.Namespace, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil) - - //eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - // AddFunc: a.addEvent, - // UpdateFunc: a.updateEvent, - //}) - for _, gvr := range a.gvrs { informer, _, err := factory.GetNamespaced(gvr, a.namespace) if err != nil { @@ -183,151 +96,37 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { } func (a *adapter) addEvent(obj interface{}) { - object := obj.(*duckv1alpha1.KResource) + event, err := a.makeAddEvent(obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return + } - if err := a.send(addEventType, object, &KubernetesEvent{ - Object: object, - }); err != nil { + if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) } } func (a *adapter) updateEvent(oldObj, newObj interface{}) { - object := newObj.(*duckv1alpha1.KResource) - - if err := a.send(updateEventType, object, &KubernetesEvent{ - NewObject: object, - OldObject: oldObj.(*duckv1alpha1.KResource), - }); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) + event, err := a.makeUpdateEvent(oldObj, newObj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return } -} - -func (a *adapter) deleteEvent(obj interface{}) { - object := obj.(*duckv1alpha1.KResource) - if err := a.send(deleteEventType, object, &KubernetesEvent{ - Object: object, - }); err != nil { + if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) } } -func (a *adapter) send(eventType string, obj *duckv1alpha1.KResource, data *KubernetesEvent) error { - subject := createSelfLink(corev1.ObjectReference{ - APIVersion: obj.APIVersion, - Kind: obj.Kind, - Name: obj.GetName(), - Namespace: obj.GetNamespace(), - }) - - source := createSelfLink(corev1.ObjectReference{ - APIVersion: a.APIVersion, - Kind: a.Kind, - Name: a.name, - Namespace: a.namespace, - }) - - event := cloudevents.NewEvent() - event.SetType(eventType) - event.SetSource(source) - event.SetSubject(subject) - - if err := event.SetData(data); err != nil { - a.logger.Warn("failed to set event data for kubernetes event") - return err - } - - if _, err := a.ce.Send(context.TODO(), event); err != nil { - return err - } - return nil -} - -// NewController initializes the controller and is called by the generated code -// Registers event handlers to enqueue events -func NewController( - opt reconciler.Options, - informer cache.SharedInformer, - lister cache.GenericLister, - eventsclient cloudevents.Client, - controlled bool) *controller.Impl { - - r := &Reconciler{ - Base: reconciler.NewBase(opt, controllerAgentName), - lister: lister, - eventsClient: eventsclient, - } - impl := controller.NewImpl(r, r.Logger, ReconcilerName, reconciler.MustNewStatsReporter(ReconcilerName, r.Logger)) - - r.Logger.Info("Setting up event handlers") - - if controlled { - informer.AddEventHandler(reconciler.Handler(impl.EnqueueControllerOf)) - } else { - informer.AddEventHandler(reconciler.Handler(impl.Enqueue)) - } - return impl -} - -// Reconciler reconciles an ApiServerSource object -type Reconciler struct { - *reconciler.Base - - eventsClient cloudevents.Client - lister cache.GenericLister -} - -// Reconcile sends a cloud event corresponding to the given key -func (r *Reconciler) Reconcile(ctx context.Context, key string) error { - // Convert the namespace/name string into a distinct namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key) +func (a *adapter) deleteEvent(obj interface{}) { + event, err := a.makeDeleteEvent(obj) if err != nil { - r.Logger.Errorf("invalid resource key: %s", key) - return nil - } - - // Get the resource with this namespace/name - original, err := r.lister.ByNamespace(namespace).Get(name) - if apierrs.IsNotFound(err) { - // The resource may no longer exist, in which case we stop processing. - r.Logger.Error("resource key in work queue no longer exists", zap.Any("key", key)) - return nil - } else if err != nil { - return err - } - - object := original.(*duckv1alpha1.AddressableType) - - eventType := updateEventType - timestamp := object.GetCreationTimestamp() - if object.GetDeletionTimestamp() != nil { - eventType = deleteEventType - timestamp = *object.GetDeletionTimestamp() - } - - objectRef := corev1.ObjectReference{ - APIVersion: object.APIVersion, - Kind: object.Kind, - Name: object.GetName(), - Namespace: object.GetNamespace(), - } - - event := cloudevents.Event{ - Context: cloudevents.EventContextV02{ - ID: string(object.GetUID()), - Type: eventType, - Source: *types.ParseURLRef(object.GetSelfLink()), - Time: &types.Timestamp{Time: timestamp.Time}, - }.AsV02(), - Data: objectRef, + a.logger.Info("event creation failed", zap.Error(err)) + return } - if _, err := r.eventsClient.Send(ctx, event); err != nil { - r.Logger.Error("failed to send cloudevent (retrying)", err) - - return err + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) } - - return nil } diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 171755796d2..6f0a0cd8ffd 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -16,188 +16,4 @@ limitations under the License. package apiserver -import ( - "context" - "io/ioutil" - "net/http" - "net/http/httptest" - gotesting "testing" - "time" - - "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json" - "github.com/google/go-cmp/cmp" - "github.com/knative/eventing/pkg/kncloudevents" - "github.com/knative/eventing/pkg/reconciler" - "github.com/knative/pkg/apis/duck" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - logtesting "github.com/knative/pkg/logging/testing" - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - fakedynamicclientset "k8s.io/client-go/dynamic/fake" - fakekubeclientset "k8s.io/client-go/kubernetes/fake" -) - -const ( - sourceName = "test-apiserver-adapter" - sourceUID = "1234-5678-90" - testNS = "testnamespace" -) - -type testCase struct { - // Name is a descriptive name for this test suitable as a first argument to t.Run() - Name string - - // InitialState is the list of objects that already exists when reconciliation - // starts. - InitialState []runtime.Object - - // Key is the parameter to reconciliation. - // This has the form "namespace/name". - Key string - - // Where to send events - sink func(http.ResponseWriter, *http.Request) - - // Expected event data - data interface{} -} - -func TestReconcile(t *gotesting.T) { - table := []testCase{ - { - Name: "Receive Pod creation event", - InitialState: []runtime.Object{ - getPod(), - }, - Key: testNS + "/" + sourceName, - - sink: sinkAccepted, - data: decode(t, encode(t, getPodRef())), - }, - } - - for _, tc := range table { - t.Run(tc.Name, func(t *gotesting.T) { - // Create fake sink server - h := &fakeHandler{ - handler: tc.sink, - } - - sinkServer := httptest.NewServer(h) - defer sinkServer.Close() - - // Bind cloud event client - ceClient, err := kncloudevents.NewDefaultClient(sinkServer.URL) - if err != nil { - t.Errorf("cannot create cloud event client: %v", zap.Error(err)) - } - - // Create fake dynamic client - dynamicScheme := runtime.NewScheme() - client := fakedynamicclientset.NewSimpleDynamicClient(dynamicScheme, tc.InitialState...) - - stopCh := make(chan struct{}) - defer close(stopCh) - - tif := &duck.TypedInformerFactory{ - Client: client, - Type: &duckv1alpha1.AddressableType{}, - ResyncPeriod: 1 * time.Second, - StopChannel: stopCh, - } - - _, lister, err := tif.Get(schema.GroupVersionResource{Group: "", Resource: "pods", Version: "v1"}) - if err != nil { - t.Fatalf("Get() = %v", err) - } - - opt := reconciler.Options{ - KubeClientSet: fakekubeclientset.NewSimpleClientset(), - Logger: logtesting.TestLogger(t), - } - - r := &Reconciler{ - Base: reconciler.NewBase(opt, controllerAgentName), - eventsClient: ceClient, - lister: lister, - } - ctx := context.Background() - - err = r.Reconcile(ctx, tc.Key) - if err != nil { - t.Errorf("Expected no error") - } - - if diff := cmp.Diff(tc.data, decode(t, h.body)); diff != "" { - t.Errorf("incorrect event (-want, +got): %v", diff) - } - }) - } - -} - -func getPod() runtime.Object { - return &unstructured.Unstructured{ - Object: map[string]interface{}{ - "apiVersion": "v1", - "kind": "Pod", - "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - "selfLink": "/apis/v1/namespaces/" + testNS + "/pod/" + sourceName, - }, - }, - } -} - -func getPodRef() corev1.ObjectReference { - return corev1.ObjectReference{ - APIVersion: "v1", - Kind: "Pod", - Name: sourceName, - Namespace: testNS, - } -} - -type fakeHandler struct { - body []byte - header http.Header - - handler func(http.ResponseWriter, *http.Request) -} - -func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.header = r.Header - body, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, "can not read body", http.StatusBadRequest) - return - } - h.body = body - defer r.Body.Close() - h.handler(w, r) -} - -func sinkAccepted(writer http.ResponseWriter, req *http.Request) { - writer.WriteHeader(http.StatusOK) -} - -func encode(t *gotesting.T, data interface{}) string { - b, err := json.Encode(data) - if err != nil { - t.Fatalf("failed to encode data: %v", err) - } - return string(b) -} - -func decode(t *gotesting.T, data interface{}) interface{} { - var out interface{} - err := json.Decode(data, &out) - if err != nil { - t.Fatalf("failed to decode data: %v", err) - } - return out -} +// TODO diff --git a/pkg/adapter/apiserver/event.go b/pkg/adapter/apiserver/event.go new file mode 100644 index 00000000000..e00ceab8de3 --- /dev/null +++ b/pkg/adapter/apiserver/event.go @@ -0,0 +1,102 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + cloudevents "github.com/cloudevents/sdk-go" + duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + corev1 "k8s.io/api/core/v1" + "strings" +) + +const ( + addEventType = "dev.knative.apiserver.object.add" + updateEventType = "dev.knative.apiserver.object.update" + deleteEventType = "dev.knative.apiserver.object.delete" +) + +type KubernetesEvent struct { + Object *duckv1alpha1.KResource `json:"obj,omitempty"` + NewObject *duckv1alpha1.KResource `json:"newObj,omitempty"` + OldObject *duckv1alpha1.KResource `json:"oldObj,omitempty"` +} + +func (a *adapter) makeAddEvent(obj interface{}) (*cloudevents.Event, error) { + object := obj.(*duckv1alpha1.KResource) + + return a.makeEvent(addEventType, object, &KubernetesEvent{ + Object: object, + }) +} + +func (a *adapter) makeUpdateEvent(oldObj, newObj interface{}) (*cloudevents.Event, error) { + object := newObj.(*duckv1alpha1.KResource) + + return a.makeEvent(updateEventType, object, &KubernetesEvent{ + NewObject: object, + OldObject: oldObj.(*duckv1alpha1.KResource), + }) +} + +func (a *adapter) makeDeleteEvent(obj interface{}) (*cloudevents.Event, error) { + object := obj.(*duckv1alpha1.KResource) + + return a.makeEvent(deleteEventType, object, &KubernetesEvent{ + Object: object, + }) +} + +func (a *adapter) makeEvent(eventType string, obj *duckv1alpha1.KResource, data *KubernetesEvent) (*cloudevents.Event, error) { + subject := createSelfLink(corev1.ObjectReference{ + APIVersion: obj.APIVersion, + Kind: obj.Kind, + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }) + + event := cloudevents.NewEvent() + event.SetType(eventType) + event.SetSource(a.source) + event.SetSubject(subject) + + if err := event.SetData(data); err != nil { + a.logger.Warn("failed to set event data for kubernetes event") + return nil, err + } + return &event, nil +} + +// Creates a URI of the form found in object metadata selfLinks +// Format looks like: /apis/feeds.knative.dev/v1alpha1/namespaces/default/feeds/k8s-events-example +// KNOWN ISSUES: +// * ObjectReference.APIVersion has no version information (e.g. serving.knative.dev rather than serving.knative.dev/v1alpha1) +// * ObjectReference does not have enough information to create the pluaralized list type (e.g. "revisions" from kind: Revision) +// +// Track these issues at https://github.com/kubernetes/kubernetes/issues/66313 +// We could possibly work around this by adding a lister for the resources referenced by these events. +func createSelfLink(o corev1.ObjectReference) string { + collectionNameHack := strings.ToLower(o.Kind) + "s" + versionNameHack := o.APIVersion + + // Core API types don't have a separate package name and only have a version string (e.g. /apis/v1/namespaces/default/pods/myPod) + // To avoid weird looking strings like "v1/versionUnknown" we'll sniff for a "." in the version + if strings.Contains(versionNameHack, ".") && !strings.Contains(versionNameHack, "/") { + versionNameHack = versionNameHack + "/versionUnknown" + } + return fmt.Sprintf("/apis/%s/namespaces/%s/%s/%s", versionNameHack, o.Namespace, collectionNameHack, o.Name) +} From 867c01b6a4fa345ed5657ba159df73ccdfbf5d34 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 7 May 2019 15:06:08 -0700 Subject: [PATCH 05/16] Working with controller flag. --- Gopkg.lock | 4 - cmd/apiserver_receive_adapter/main.go | 17 +- config/300-apiserversource.yaml | 5 +- pkg/adapter/apiserver/adapter.go | 163 ++++++++++++------ pkg/adapter/apiserver/event.go | 102 ----------- pkg/adapter/apiserver/events/events.go | 142 +++++++++++++++ pkg/adapter/apiserver/ref.go | 96 +++++++++++ pkg/adapter/apiserver/resource.go | 72 ++++++++ pkg/apis/sources/v1alpha1/apiserver_types.go | 5 + .../resources/receive_adapter.go | 3 + sample/apiserver_pod.yaml | 1 + .../github.com/knative/pkg/apis/duck/typed.go | 22 --- 12 files changed, 443 insertions(+), 189 deletions(-) delete mode 100644 pkg/adapter/apiserver/event.go create mode 100644 pkg/adapter/apiserver/events/events.go create mode 100644 pkg/adapter/apiserver/ref.go create mode 100644 pkg/adapter/apiserver/resource.go diff --git a/Gopkg.lock b/Gopkg.lock index 20a6e81ce8b..d6e6c86cf14 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1379,11 +1379,7 @@ "github.com/Shopify/sarama", "github.com/bsm/sarama-cluster", "github.com/cloudevents/sdk-go", - "github.com/cloudevents/sdk-go/pkg/cloudevents", - "github.com/cloudevents/sdk-go/pkg/cloudevents/client", - "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", - "github.com/cloudevents/sdk-go/pkg/cloudevents/types", "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/uuid", diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index 8557671f003..f4552c6ca76 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -40,6 +40,7 @@ var ( type envConfig struct { Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"` + Mode string `envconfig:"MODE"` SinkURI string `split_words:"true" required:"true"` ApiVersion []string `split_words:"true" required:"true"` Kind []string `required:"true"` @@ -83,10 +84,11 @@ func main() { logger.Fatalw("Error building cloud event client", zap.Error(err)) } - gvrs := []schema.GroupVersionResource(nil) + gvrcs := []apiserver.GVRC(nil) for i, apiVersion := range env.ApiVersion { kind := env.Kind[i] + controlled := env.Controller[i] gv, err := schema.ParseGroupVersion(apiVersion) if err != nil { @@ -94,10 +96,19 @@ func main() { } // This is really bad. gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}) - gvrs = append(gvrs, gvr) + gvrcs = append(gvrcs, apiserver.GVRC{ + GVR: gvr, + Controller: controlled, + }) } - a := apiserver.NewAdaptor(cfg.Host, env.Namespace, client, eventsClient, logger, gvrs...) + opt := apiserver.Options{ + Namespace: env.Namespace, + Mode: env.Mode, + GVRCs: gvrcs, + } + + a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt) logger.Info("starting kubernetes api adapter") if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) diff --git a/config/300-apiserversource.yaml b/config/300-apiserversource.yaml index 1d1ac990297..44b051ca3bc 100644 --- a/config/300-apiserversource.yaml +++ b/config/300-apiserversource.yaml @@ -50,6 +50,9 @@ spec: sink: type: object description: "A reference to the object that should receive events." + mode: + type: string + description: "Mode controls the content of the event payload. One of: 'Ref' (only references of resources), 'Resource' (full resource as ResourceEvent)." resources: items: properties: @@ -61,7 +64,7 @@ spec: description: "Kind of the objects to watch." controller: type: boolean - description: "If true, watch the managing controller. More info: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/" + description: "If true, emits the managing controller ref. Only supported for mode=Ref. More info: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/" type: array required: - resources diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 5e344f67918..df8dcadd318 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -17,16 +17,18 @@ limitations under the License. package apiserver import ( - "context" + "fmt" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" cloudevents "github.com/cloudevents/sdk-go" - "github.com/knative/pkg/apis/duck" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" ) @@ -34,60 +36,125 @@ type Adapter interface { Start(stopCh <-chan struct{}) error } +const ( + // RefMode produces payloads of ObjectReference + RefMode = "Ref" + // ResourceMode produces payloads of ResourceEvent + ResourceMode = "Resource" +) + +// Options hold the options for the Adapter. +type Options struct { + Mode string + Namespace string + GVRCs []GVRC +} + +// GVRC is a pairing of GroupVersionResource and Controller flag. +type GVRC struct { + GVR schema.GroupVersionResource + Controller bool +} + type adapter struct { - gvrs []schema.GroupVersionResource + gvrcs []GVRC k8s dynamic.Interface ce cloudevents.Client source string namespace string logger *zap.SugaredLogger + + mode string + delegate eventDelegate } -func NewAdaptor(source, namespace string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, gvr ...schema.GroupVersionResource) Adapter { +func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter { + mode := opt.Mode + switch mode { + case ResourceMode, RefMode: + // ok + default: + logger.Warn("unknown mode ", mode) + mode = RefMode + logger.Warn("defaulting mode to ", mode) + } + a := &adapter{ k8s: k8sClient, ce: ceClient, source: source, - namespace: namespace, - gvrs: gvr, logger: logger, + gvrcs: opt.GVRCs, + namespace: opt.Namespace, + mode: mode, } return a } -/* -TODO: No longer sending events for the controller of the updated object, a al: +type eventDelegate interface { + addEvent(obj interface{}) + updateEvent(oldObj, newObj interface{}) + deleteEvent(obj interface{}) - if controlled { - informer.AddEventHandler(reconciler.Handler(impl.EnqueueControllerOf)) - } else { - informer.AddEventHandler(reconciler.Handler(impl.Enqueue)) - } - -*/ + addControllerWatch(gvr schema.GroupVersionResource) +} func (a *adapter) Start(stopCh <-chan struct{}) error { // Local stop channel. stop := make(chan struct{}) - factory := duck.TypedInformerFactory{ - Client: a.k8s, - ResyncPeriod: time.Duration(10 * time.Hour), - StopChannel: stop, - Type: &duckv1alpha1.KResource{}, + resyncPeriod := time.Duration(10 * time.Hour) + + var d eventDelegate + switch a.mode { + case ResourceMode: + d = &resource{ + ce: a.ce, + source: a.source, + logger: a.logger, + } + + case RefMode: + d = &ref{ + ce: a.ce, + source: a.source, + logger: a.logger, + } + + default: + a.logger.Fatal("mode not understood", a.mode) } - for _, gvr := range a.gvrs { - informer, _, err := factory.GetNamespaced(gvr, a.namespace) - if err != nil { - return err + for _, gvrc := range a.gvrcs { + var informer cache.SharedIndexInformer + + lw := &cache.ListWatch{ + ListFunc: asUnstructuredLister(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).List), + WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).Watch), + } + informer = cache.NewSharedIndexInformer(lw, &unstructured.Unstructured{}, resyncPeriod, cache.Indexers{ + cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, + }) + + go informer.Run(stopCh) + + if ok := cache.WaitForCacheSync(stopCh, informer.HasSynced); !ok { + return fmt.Errorf("failed starting shared index informer for %v with type %T on namespace %s", gvrc.GVR, a.namespace) + } + + // Double check the delegate is not nil. + if d == nil { + continue } informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: a.addEvent, - UpdateFunc: a.updateEvent, - DeleteFunc: a.deleteEvent, + AddFunc: d.addEvent, + UpdateFunc: d.updateEvent, + DeleteFunc: d.deleteEvent, }) + if gvrc.Controller { + d.addControllerWatch(gvrc.GVR) + } } <-stopCh @@ -95,38 +162,20 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { return nil } -func (a *adapter) addEvent(obj interface{}) { - event, err := a.makeAddEvent(obj) - if err != nil { - a.logger.Info("event creation failed", zap.Error(err)) - return - } +type unstructuredLister func(metav1.ListOptions) (*unstructured.UnstructuredList, error) - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - } -} - -func (a *adapter) updateEvent(oldObj, newObj interface{}) { - event, err := a.makeUpdateEvent(oldObj, newObj) - if err != nil { - a.logger.Info("event creation failed", zap.Error(err)) - return - } - - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) +func asUnstructuredLister(ulist unstructuredLister) cache.ListFunc { + return func(opts metav1.ListOptions) (runtime.Object, error) { + ul, err := ulist(opts) + if err != nil { + return nil, err + } + return ul, nil } } -func (a *adapter) deleteEvent(obj interface{}) { - event, err := a.makeDeleteEvent(obj) - if err != nil { - a.logger.Info("event creation failed", zap.Error(err)) - return - } - - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) +func asUnstructuredWatcher(wf cache.WatchFunc) cache.WatchFunc { + return func(lo metav1.ListOptions) (watch.Interface, error) { + return wf(lo) } } diff --git a/pkg/adapter/apiserver/event.go b/pkg/adapter/apiserver/event.go deleted file mode 100644 index e00ceab8de3..00000000000 --- a/pkg/adapter/apiserver/event.go +++ /dev/null @@ -1,102 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "fmt" - cloudevents "github.com/cloudevents/sdk-go" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - corev1 "k8s.io/api/core/v1" - "strings" -) - -const ( - addEventType = "dev.knative.apiserver.object.add" - updateEventType = "dev.knative.apiserver.object.update" - deleteEventType = "dev.knative.apiserver.object.delete" -) - -type KubernetesEvent struct { - Object *duckv1alpha1.KResource `json:"obj,omitempty"` - NewObject *duckv1alpha1.KResource `json:"newObj,omitempty"` - OldObject *duckv1alpha1.KResource `json:"oldObj,omitempty"` -} - -func (a *adapter) makeAddEvent(obj interface{}) (*cloudevents.Event, error) { - object := obj.(*duckv1alpha1.KResource) - - return a.makeEvent(addEventType, object, &KubernetesEvent{ - Object: object, - }) -} - -func (a *adapter) makeUpdateEvent(oldObj, newObj interface{}) (*cloudevents.Event, error) { - object := newObj.(*duckv1alpha1.KResource) - - return a.makeEvent(updateEventType, object, &KubernetesEvent{ - NewObject: object, - OldObject: oldObj.(*duckv1alpha1.KResource), - }) -} - -func (a *adapter) makeDeleteEvent(obj interface{}) (*cloudevents.Event, error) { - object := obj.(*duckv1alpha1.KResource) - - return a.makeEvent(deleteEventType, object, &KubernetesEvent{ - Object: object, - }) -} - -func (a *adapter) makeEvent(eventType string, obj *duckv1alpha1.KResource, data *KubernetesEvent) (*cloudevents.Event, error) { - subject := createSelfLink(corev1.ObjectReference{ - APIVersion: obj.APIVersion, - Kind: obj.Kind, - Name: obj.GetName(), - Namespace: obj.GetNamespace(), - }) - - event := cloudevents.NewEvent() - event.SetType(eventType) - event.SetSource(a.source) - event.SetSubject(subject) - - if err := event.SetData(data); err != nil { - a.logger.Warn("failed to set event data for kubernetes event") - return nil, err - } - return &event, nil -} - -// Creates a URI of the form found in object metadata selfLinks -// Format looks like: /apis/feeds.knative.dev/v1alpha1/namespaces/default/feeds/k8s-events-example -// KNOWN ISSUES: -// * ObjectReference.APIVersion has no version information (e.g. serving.knative.dev rather than serving.knative.dev/v1alpha1) -// * ObjectReference does not have enough information to create the pluaralized list type (e.g. "revisions" from kind: Revision) -// -// Track these issues at https://github.com/kubernetes/kubernetes/issues/66313 -// We could possibly work around this by adding a lister for the resources referenced by these events. -func createSelfLink(o corev1.ObjectReference) string { - collectionNameHack := strings.ToLower(o.Kind) + "s" - versionNameHack := o.APIVersion - - // Core API types don't have a separate package name and only have a version string (e.g. /apis/v1/namespaces/default/pods/myPod) - // To avoid weird looking strings like "v1/versionUnknown" we'll sniff for a "." in the version - if strings.Contains(versionNameHack, ".") && !strings.Contains(versionNameHack, "/") { - versionNameHack = versionNameHack + "/versionUnknown" - } - return fmt.Sprintf("/apis/%s/namespaces/%s/%s/%s", versionNameHack, o.Namespace, collectionNameHack, o.Name) -} diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go new file mode 100644 index 00000000000..09ec771ccb7 --- /dev/null +++ b/pkg/adapter/apiserver/events/events.go @@ -0,0 +1,142 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + cloudevents "github.com/cloudevents/sdk-go" +) + +const ( + addEventType = "dev.knative.apiserver.resource.add" + updateEventType = "dev.knative.apiserver.resource.update" + deleteEventType = "dev.knative.apiserver.resource.delete" + + addEventRefType = "dev.knative.apiserver.ref.add" + updateEventRefType = "dev.knative.apiserver.ref.update" + deleteEventRefType = "dev.knative.apiserver.ref.delete" +) + +type ResourceEvent struct { + Object *unstructured.Unstructured `json:"obj,omitempty"` + NewObject *unstructured.Unstructured `json:"newObj,omitempty"` + OldObject *unstructured.Unstructured `json:"oldObj,omitempty"` +} + +func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { + object := obj.(*unstructured.Unstructured) + + return makeEvent(source, addEventType, object, &ResourceEvent{ + Object: object, + }) +} + +func MakeUpdateEvent(source string, oldObj, newObj interface{}) (*cloudevents.Event, error) { + object := newObj.(*unstructured.Unstructured) + + return makeEvent(source, updateEventType, object, &ResourceEvent{ + NewObject: object, + OldObject: oldObj.(*unstructured.Unstructured), + }) +} + +func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) { + object := obj.(*unstructured.Unstructured) + + return makeEvent(source, deleteEventType, object, &ResourceEvent{ + Object: object, + }) +} + +func getRef(object *unstructured.Unstructured, controller bool) corev1.ObjectReference { + if controller { + if owner := metav1.GetControllerOf(object); owner != nil { + return corev1.ObjectReference{ + APIVersion: owner.APIVersion, + Kind: owner.Kind, + Name: owner.Name, + Namespace: object.GetNamespace(), + } + } + } + return corev1.ObjectReference{ + APIVersion: object.GetAPIVersion(), + Kind: object.GetKind(), + Name: object.GetName(), + Namespace: object.GetNamespace(), + } +} + +func MakeAddRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + object := obj.(*unstructured.Unstructured) + return makeEvent(source, addEventRefType, object, getRef(object, asController)) +} + +func MakeUpdateRefEvent(source string, asController bool, oldObj, newObj interface{}) (*cloudevents.Event, error) { + object := newObj.(*unstructured.Unstructured) + return makeEvent(source, updateEventRefType, object, getRef(object, asController)) +} + +func MakeDeleteRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + object := obj.(*unstructured.Unstructured) + return makeEvent(source, deleteEventRefType, object, getRef(object, asController)) +} + +func makeEvent(source, eventType string, obj *unstructured.Unstructured, data interface{}) (*cloudevents.Event, error) { + subject := createSelfLink(corev1.ObjectReference{ + APIVersion: obj.GetAPIVersion(), + Kind: obj.GetKind(), + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }) + + event := cloudevents.NewEvent() + event.SetType(eventType) + event.SetSource(source) + event.SetSubject(subject) + + if err := event.SetData(data); err != nil { + return nil, err + } + return &event, nil +} + +// Creates a URI of the form found in object metadata selfLinks +// Format looks like: /apis/feeds.knative.dev/v1alpha1/namespaces/default/feeds/k8s-events-example +// KNOWN ISSUES: +// * ObjectReference.APIVersion has no version information (e.g. serving.knative.dev rather than serving.knative.dev/v1alpha1) +// * ObjectReference does not have enough information to create the pluaralized list type (e.g. "revisions" from kind: Revision) +// +// Track these issues at https://github.com/kubernetes/kubernetes/issues/66313 +// We could possibly work around this by adding a lister for the resources referenced by these events. +func createSelfLink(o corev1.ObjectReference) string { + collectionNameHack := strings.ToLower(o.Kind) + "s" + versionNameHack := o.APIVersion + + // Core API types don't have a separate package name and only have a version string (e.g. /apis/v1/namespaces/default/pods/myPod) + // To avoid weird looking strings like "v1/versionUnknown" we'll sniff for a "." in the version + if strings.Contains(versionNameHack, ".") && !strings.Contains(versionNameHack, "/") { + versionNameHack = versionNameHack + "/versionUnknown" + } + return fmt.Sprintf("/apis/%s/namespaces/%s/%s/%s", versionNameHack, o.Namespace, collectionNameHack, o.Name) +} diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go new file mode 100644 index 00000000000..793d2a30181 --- /dev/null +++ b/pkg/adapter/apiserver/ref.go @@ -0,0 +1,96 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "reflect" + + cloudevents "github.com/cloudevents/sdk-go" + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "go.uber.org/zap" +) + +type ref struct { + ce cloudevents.Client + source string + logger *zap.SugaredLogger + + controlledGVRs []schema.GroupVersionResource +} + +func (a *ref) asController(obj interface{}) bool { + if len(a.controlledGVRs) > 0 { + object := obj.(*unstructured.Unstructured) + gvk := object.GroupVersionKind() + // This is really bad. + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + for _, gvrc := range a.controlledGVRs { + if reflect.DeepEqual(gvr, gvrc) { + return true + } + } + } + return false +} + +func (a *ref) addEvent(obj interface{}) { + event, err := events.MakeAddRefEvent(a.source, a.asController(obj), obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *ref) updateEvent(oldObj, newObj interface{}) { + event, err := events.MakeUpdateRefEvent(a.source, a.asController(newObj), oldObj, newObj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *ref) deleteEvent(obj interface{}) { + event, err := events.MakeDeleteRefEvent(a.source, a.asController(obj), obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { + if a.controlledGVRs == nil { + a.controlledGVRs = []schema.GroupVersionResource{gvr} + return + } + a.controlledGVRs = append(a.controlledGVRs, gvr) +} diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go new file mode 100644 index 00000000000..724525e45ef --- /dev/null +++ b/pkg/adapter/apiserver/resource.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + cloudevents "github.com/cloudevents/sdk-go" + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type resource struct { + ce cloudevents.Client + source string + logger *zap.SugaredLogger +} + +func (a *resource) addEvent(obj interface{}) { + event, err := events.MakeAddEvent(a.source, obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *resource) updateEvent(oldObj, newObj interface{}) { + event, err := events.MakeUpdateEvent(a.source, oldObj, newObj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *resource) deleteEvent(obj interface{}) { + event, err := events.MakeDeleteEvent(a.source, obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + } +} + +func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { + // not supported for resource. + a.logger.Warn("ignored controller watch request on gvr.", zap.String("gvr", gvr.String())) +} diff --git a/pkg/apis/sources/v1alpha1/apiserver_types.go b/pkg/apis/sources/v1alpha1/apiserver_types.go index 344a0cea997..edd632fc175 100644 --- a/pkg/apis/sources/v1alpha1/apiserver_types.go +++ b/pkg/apis/sources/v1alpha1/apiserver_types.go @@ -67,6 +67,11 @@ type ApiServerSourceSpec struct { // Sink is a reference to an object that will resolve to a domain name to use as the sink. // +optional Sink *corev1.ObjectReference `json:"sink,omitempty"` + + // Mode is the mode the receive adapter controller runs under: Ref or Resource. + // `Ref` sends only the reference to the resource. + // `Resource` send the full resource. + Mode string `json:"mode,omitempty"` } // ApiServerSourceStatus defines the observed state of ApiServerSource diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index 3fcebdd214b..fe6b1daddbb 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -95,6 +95,9 @@ func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar return []corev1.EnvVar{{ Name: "SINK_URI", Value: sinkURI, + }, { + Name: "MODE", + Value: spec.Mode, }, { Name: "API_VERSION", Value: apiversions, diff --git a/sample/apiserver_pod.yaml b/sample/apiserver_pod.yaml index 501c08e2d95..109dbbf8cee 100644 --- a/sample/apiserver_pod.yaml +++ b/sample/apiserver_pod.yaml @@ -7,6 +7,7 @@ spec: resources: - apiVersion: v1 kind: Pod + mode: "Resource" sink: apiVersion: eventing.knative.dev/v1alpha1 kind: Broker diff --git a/vendor/github.com/knative/pkg/apis/duck/typed.go b/vendor/github.com/knative/pkg/apis/duck/typed.go index 92a0fbc560d..9d29c1e0bdc 100644 --- a/vendor/github.com/knative/pkg/apis/duck/typed.go +++ b/vendor/github.com/knative/pkg/apis/duck/typed.go @@ -66,28 +66,6 @@ func (dif *TypedInformerFactory) Get(gvr schema.GroupVersionResource) (cache.Sha return inf, lister, nil } -// Get implements InformerFactory. -func (dif *TypedInformerFactory) GetNamespaced(gvr schema.GroupVersionResource, namespace string) (cache.SharedIndexInformer, cache.GenericNamespaceLister, error) { - listObj := dif.Type.GetListType() - lw := &cache.ListWatch{ - ListFunc: asStructuredLister(dif.Client.Resource(gvr).Namespace(namespace).List, listObj), - WatchFunc: AsStructuredWatcher(dif.Client.Resource(gvr).Namespace(namespace).Watch, dif.Type), - } - inf := cache.NewSharedIndexInformer(lw, dif.Type, dif.ResyncPeriod, cache.Indexers{ - cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, - }) - - lister := cache.NewGenericLister(inf.GetIndexer(), gvr.GroupResource()).ByNamespace(namespace) - - go inf.Run(dif.StopChannel) - - if ok := cache.WaitForCacheSync(dif.StopChannel, inf.HasSynced); !ok { - return nil, nil, fmt.Errorf("failed starting shared index informer for %v with type %T on namespace %s", gvr, dif.Type, namespace) - } - - return inf, lister, nil -} - type unstructuredLister func(metav1.ListOptions) (*unstructured.UnstructuredList, error) func asStructuredLister(ulist unstructuredLister, listObj runtime.Object) cache.ListFunc { From 6ce8e88369947eceec9ac4e4c487bde71932b0a7 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 7 May 2019 15:08:30 -0700 Subject: [PATCH 06/16] add todo. --- pkg/adapter/apiserver/ref.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index 793d2a30181..19c2c3a8f86 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -36,6 +36,10 @@ type ref struct { controlledGVRs []schema.GroupVersionResource } +// TODO: I think asController is not the feature we want. I think we want to be +// able to set the controller as a filter to the watch. Not emit all owners of +// the resource. Fix this. It has to be an api change on the CRD. + func (a *ref) asController(obj interface{}) bool { if len(a.controlledGVRs) > 0 { object := obj.(*unstructured.Unstructured) From e858e4b679f03accd0ea58cd37137c80a2a19e31 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 7 May 2019 15:12:13 -0700 Subject: [PATCH 07/16] newline. --- sample/apiserver_pod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sample/apiserver_pod.yaml b/sample/apiserver_pod.yaml index 109dbbf8cee..a97b1b918c1 100644 --- a/sample/apiserver_pod.yaml +++ b/sample/apiserver_pod.yaml @@ -44,4 +44,4 @@ roleRef: subjects: - kind: ServiceAccount name: events-sa - namespace: default \ No newline at end of file + namespace: default From 37a05ab602b053307ab6ebed734e61f9e457a3ca Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 7 May 2019 18:05:07 -0700 Subject: [PATCH 08/16] fix the test. --- .../apiserversource/resources/receive_adapter.go | 9 +-------- .../apiserversource/resources/receive_adapter_test.go | 4 +++- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index fe6b1daddbb..7fd4223c9e0 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -56,7 +56,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - "sidecar.istio.io/inject": "false", + "sidecar.istio.io/inject": "false", // needs to talk to the api server. }, Labels: args.Labels, }, @@ -114,12 +114,5 @@ func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar FieldPath: "metadata.namespace", }, }, - }, { - Name: "SYSTEM_NAME", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.name", - }, - }, }} } diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index f6574ec8ee6..0b72cb128ea 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -89,7 +89,7 @@ func TestMakeReceiveAdapter(t *testing.T) { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - "sidecar.istio.io/inject": "true", + "sidecar.istio.io/inject": "false", }, Labels: map[string]string{ "test-key1": "test-value1", @@ -106,6 +106,8 @@ func TestMakeReceiveAdapter(t *testing.T) { { Name: "SINK_URI", Value: "sink-uri", + }, { + Name: "MODE", }, { Name: "API_VERSION", Value: ",", From f8d463ce954e2026bb48215c6462dac1a658aced Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 8 May 2019 07:48:32 -0700 Subject: [PATCH 09/16] fix format. --- pkg/adapter/apiserver/adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index df8dcadd318..4d74977e11f 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -139,7 +139,7 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { go informer.Run(stopCh) if ok := cache.WaitForCacheSync(stopCh, informer.HasSynced); !ok { - return fmt.Errorf("failed starting shared index informer for %v with type %T on namespace %s", gvrc.GVR, a.namespace) + return fmt.Errorf("failed starting shared index informer for %s on namespace %s", gvrc.GVR.String(), a.namespace) } // Double check the delegate is not nil. From 5f2a78b9e399cc35b57de7f7bc1354b2ea736ef4 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 8 May 2019 09:23:38 -0700 Subject: [PATCH 10/16] adding tests for event generation. --- pkg/adapter/apiserver/events/events.go | 32 +- pkg/adapter/apiserver/events/events_test.go | 389 ++++++++++++++++++++ 2 files changed, 416 insertions(+), 5 deletions(-) create mode 100644 pkg/adapter/apiserver/events/events_test.go diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go index 09ec771ccb7..bad9e5a4b5b 100644 --- a/pkg/adapter/apiserver/events/events.go +++ b/pkg/adapter/apiserver/events/events.go @@ -44,6 +44,9 @@ type ResourceEvent struct { } func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } object := obj.(*unstructured.Unstructured) return makeEvent(source, addEventType, object, &ResourceEvent{ @@ -52,15 +55,25 @@ func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { } func MakeUpdateEvent(source string, oldObj, newObj interface{}) (*cloudevents.Event, error) { + if newObj == nil { + return nil, fmt.Errorf("new resource can not be nil") + } object := newObj.(*unstructured.Unstructured) - return makeEvent(source, updateEventType, object, &ResourceEvent{ + data := &ResourceEvent{ NewObject: object, - OldObject: oldObj.(*unstructured.Unstructured), - }) + } + if oldObj != nil { + data.OldObject = oldObj.(*unstructured.Unstructured) + } + + return makeEvent(source, updateEventType, object, data) } func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } object := obj.(*unstructured.Unstructured) return makeEvent(source, deleteEventType, object, &ResourceEvent{ @@ -68,8 +81,8 @@ func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) }) } -func getRef(object *unstructured.Unstructured, controller bool) corev1.ObjectReference { - if controller { +func getRef(object *unstructured.Unstructured, asController bool) corev1.ObjectReference { + if asController { if owner := metav1.GetControllerOf(object); owner != nil { return corev1.ObjectReference{ APIVersion: owner.APIVersion, @@ -88,16 +101,25 @@ func getRef(object *unstructured.Unstructured, controller bool) corev1.ObjectRef } func MakeAddRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } object := obj.(*unstructured.Unstructured) return makeEvent(source, addEventRefType, object, getRef(object, asController)) } func MakeUpdateRefEvent(source string, asController bool, oldObj, newObj interface{}) (*cloudevents.Event, error) { + if newObj == nil { + return nil, fmt.Errorf("new resource can not be nil") + } object := newObj.(*unstructured.Unstructured) return makeEvent(source, updateEventRefType, object, getRef(object, asController)) } func MakeDeleteRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } object := obj.(*unstructured.Unstructured) return makeEvent(source, deleteEventRefType, object, getRef(object, asController)) } diff --git a/pkg/adapter/apiserver/events/events_test.go b/pkg/adapter/apiserver/events/events_test.go new file mode 100644 index 00000000000..c255bad8fd9 --- /dev/null +++ b/pkg/adapter/apiserver/events/events_test.go @@ -0,0 +1,389 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events_test + +import ( + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "strings" + "testing" + + cloudevents "github.com/cloudevents/sdk-go" + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/adapter/apiserver/events" +) + +func simplePod(name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": name, + }, + }, + } +} + +func simpleOwnedPod(name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": "owned", + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "apps/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "ReplicaSet", + "name": name, + "uid": "0c119059-7113-11e9-a6c5-42010a8a00ed", + }, + }, + }, + }, + } +} + +func TestMakeAddEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.resource.add", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"obj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeAddEvent(tc.source, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeUpdateEvent(t *testing.T) { + testCases := map[string]struct { + oldObj interface{} + newObj interface{} + source string + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "new resource can not be nil", + }, + "simple pod": { + source: "unit-test", + oldObj: simplePod("unit", "test"), + newObj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.resource.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"newObj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}},"oldObj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, + }, + "nil old": { + source: "unit-test", + newObj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.resource.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"newObj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeUpdateEvent(tc.source, tc.oldObj, tc.newObj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeDeleteEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.resource.delete", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"obj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeDeleteEvent(tc.source, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeAddRefEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + asController bool + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.add", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, + }, + "simple owned pod": { + source: "unit-test", + obj: simpleOwnedPod("unit", "test"), + asController: true, + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.add", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/owned", + }, + }.AsV02(), + }, + wantData: `{"kind":"ReplicaSet","namespace":"test","name":"unit","apiVersion":"apps/v1"}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeAddRefEvent(tc.source, tc.asController, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeUpdateRefEvent(t *testing.T) { + testCases := map[string]struct { + oldObj interface{} + newObj interface{} + source string + asController bool + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "new resource can not be nil", + }, + "simple pod": { + source: "unit-test", + oldObj: simplePod("unit", "test"), + newObj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, + }, + "simple owned pod": { + source: "unit-test", + newObj: simpleOwnedPod("unit", "test"), + asController: true, + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/owned", + }, + }.AsV02(), + }, + wantData: `{"kind":"ReplicaSet","namespace":"test","name":"unit","apiVersion":"apps/v1"}`, + }, + "nil old": { + source: "unit-test", + newObj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeUpdateRefEvent(tc.source, tc.asController, tc.oldObj, tc.newObj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeDeleteRefEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + asController bool + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.delete", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, + }, + "simple owned pod": { + source: "unit-test", + obj: simpleOwnedPod("unit", "test"), + asController: true, + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.delete", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/owned", + }, + }.AsV02(), + }, + wantData: `{"kind":"ReplicaSet","namespace":"test","name":"unit","apiVersion":"apps/v1"}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeDeleteRefEvent(tc.source, tc.asController, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func validate(t *testing.T, got *cloudevents.Event, err error, want *cloudevents.Event, wantData, wantErr string) { + if wantErr != "" || err != nil { + var gotErr string + if err != nil { + gotErr = err.Error() + } + if !strings.Contains(wantErr, gotErr) { + diff := cmp.Diff(wantErr, gotErr) + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(cloudevents.Event{}, "Data", "DataEncoded")); diff != "" { + t.Errorf("unexpected event diff (-want, +got) = %v", diff) + } + + gotData := string(got.Data.([]byte)) + if diff := cmp.Diff(wantData, gotData); diff != "" { + t.Errorf("unexpected data diff (-want, +got) = %v", diff) + } +} From 7d46a6e6a5bb86deb83c6218bbfaad4db4b89875 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 8 May 2019 10:19:43 -0700 Subject: [PATCH 11/16] Adding tests for ref and resource adapter event emitters. --- pkg/adapter/apiserver/adapter_test.go | 83 +++++++++++++++++++++++- pkg/adapter/apiserver/events/events.go | 24 +++---- pkg/adapter/apiserver/ref_test.go | 76 ++++++++++++++++++++++ pkg/adapter/apiserver/resource_test.go | 48 ++++++++++++++ pkg/kncloudevents/testing/test_client.go | 37 +++++++++++ 5 files changed, 255 insertions(+), 13 deletions(-) create mode 100644 pkg/adapter/apiserver/ref_test.go create mode 100644 pkg/adapter/apiserver/resource_test.go create mode 100644 pkg/kncloudevents/testing/test_client.go diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 6f0a0cd8ffd..2233d0d46db 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -16,4 +16,85 @@ limitations under the License. package apiserver -// TODO +import ( + kncetesting "github.com/knative/eventing/pkg/kncloudevents/testing" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "testing" +) + +func simplePod(name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": name, + }, + }, + } +} + +func simpleOwnedPod(name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": "owned", + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "apps/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "ReplicaSet", + "name": name, + "uid": "0c119059-7113-11e9-a6c5-42010a8a00ed", + }, + }, + }, + }, + } +} + +func validateSent(t *testing.T, ce *kncetesting.TestCloudEventsClient, want string) { + if got := len(ce.Sent); got != 1 { + t.Errorf("Expected 1 event to be sent, got %d", got) + } + + if got := ce.Sent[0].Type(); got != want { + t.Errorf("Expected %q event to be sent, got %q", want, got) + } +} + +func validateNotSent(t *testing.T, ce *kncetesting.TestCloudEventsClient, want string) { + if got := len(ce.Sent); got != 0 { + t.Errorf("Expected 0 event to be sent, got %d", got) + } +} + +func makeResourceAndTestingClient() (*resource, *kncetesting.TestCloudEventsClient) { + ce := kncetesting.NewTestClient() + source := "unit-test" + logger := zap.NewExample().Sugar() + + return &resource{ + ce: ce, + source: source, + logger: logger, + }, ce +} + +func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) { + ce := kncetesting.NewTestClient() + source := "unit-test" + logger := zap.NewExample().Sugar() + + return &ref{ + ce: ce, + source: source, + logger: logger, + }, ce +} diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go index bad9e5a4b5b..8508a889716 100644 --- a/pkg/adapter/apiserver/events/events.go +++ b/pkg/adapter/apiserver/events/events.go @@ -28,13 +28,13 @@ import ( ) const ( - addEventType = "dev.knative.apiserver.resource.add" - updateEventType = "dev.knative.apiserver.resource.update" - deleteEventType = "dev.knative.apiserver.resource.delete" + AddEventType = "dev.knative.apiserver.resource.add" + UpdateEventType = "dev.knative.apiserver.resource.update" + DeleteEventType = "dev.knative.apiserver.resource.delete" - addEventRefType = "dev.knative.apiserver.ref.add" - updateEventRefType = "dev.knative.apiserver.ref.update" - deleteEventRefType = "dev.knative.apiserver.ref.delete" + AddEventRefType = "dev.knative.apiserver.ref.add" + UpdateEventRefType = "dev.knative.apiserver.ref.update" + DeleteEventRefType = "dev.knative.apiserver.ref.delete" ) type ResourceEvent struct { @@ -49,7 +49,7 @@ func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { } object := obj.(*unstructured.Unstructured) - return makeEvent(source, addEventType, object, &ResourceEvent{ + return makeEvent(source, AddEventType, object, &ResourceEvent{ Object: object, }) } @@ -67,7 +67,7 @@ func MakeUpdateEvent(source string, oldObj, newObj interface{}) (*cloudevents.Ev data.OldObject = oldObj.(*unstructured.Unstructured) } - return makeEvent(source, updateEventType, object, data) + return makeEvent(source, UpdateEventType, object, data) } func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) { @@ -76,7 +76,7 @@ func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) } object := obj.(*unstructured.Unstructured) - return makeEvent(source, deleteEventType, object, &ResourceEvent{ + return makeEvent(source, DeleteEventType, object, &ResourceEvent{ Object: object, }) } @@ -105,7 +105,7 @@ func MakeAddRefEvent(source string, asController bool, obj interface{}) (*cloude return nil, fmt.Errorf("resource can not be nil") } object := obj.(*unstructured.Unstructured) - return makeEvent(source, addEventRefType, object, getRef(object, asController)) + return makeEvent(source, AddEventRefType, object, getRef(object, asController)) } func MakeUpdateRefEvent(source string, asController bool, oldObj, newObj interface{}) (*cloudevents.Event, error) { @@ -113,7 +113,7 @@ func MakeUpdateRefEvent(source string, asController bool, oldObj, newObj interfa return nil, fmt.Errorf("new resource can not be nil") } object := newObj.(*unstructured.Unstructured) - return makeEvent(source, updateEventRefType, object, getRef(object, asController)) + return makeEvent(source, UpdateEventRefType, object, getRef(object, asController)) } func MakeDeleteRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { @@ -121,7 +121,7 @@ func MakeDeleteRefEvent(source string, asController bool, obj interface{}) (*clo return nil, fmt.Errorf("resource can not be nil") } object := obj.(*unstructured.Unstructured) - return makeEvent(source, deleteEventRefType, object, getRef(object, asController)) + return makeEvent(source, DeleteEventRefType, object, getRef(object, asController)) } func makeEvent(source, eventType string, obj *unstructured.Unstructured, data interface{}) (*cloudevents.Event, error) { diff --git a/pkg/adapter/apiserver/ref_test.go b/pkg/adapter/apiserver/ref_test.go new file mode 100644 index 00000000000..af96ceb4829 --- /dev/null +++ b/pkg/adapter/apiserver/ref_test.go @@ -0,0 +1,76 @@ +package apiserver + +import ( + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "k8s.io/apimachinery/pkg/runtime/schema" + "testing" +) + +func TestRefAddEvent(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addEvent(simplePod("unit", "test")) + validateSent(t, ce, events.AddEventRefType) +} + +func TestRefUpdateEvent(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.updateEvent(nil, simplePod("unit", "test")) + validateSent(t, ce, events.UpdateEventRefType) +} + +func TestRefDeleteEvent(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.deleteEvent(simplePod("unit", "test")) + validateSent(t, ce, events.DeleteEventRefType) +} + +func TestRefAddEventNil(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addEvent(nil) + validateNotSent(t, ce, events.AddEventRefType) +} + +func TestRefUpdateEventNil(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.updateEvent(nil, nil) + validateNotSent(t, ce, events.UpdateEventRefType) +} + +func TestRefDeleteEventNil(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.deleteEvent(nil) + validateNotSent(t, ce, events.DeleteEventRefType) +} + +func TestRefAddEventAsController(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }) + d.addEvent(simpleOwnedPod("unit", "test")) + validateSent(t, ce, events.AddEventRefType) +} + +func TestRefUpdateEventAsController(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }) + d.updateEvent(nil, simpleOwnedPod("unit", "test")) + validateSent(t, ce, events.UpdateEventRefType) +} + +func TestRefDeleteEventAsController(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }) + d.deleteEvent(simpleOwnedPod("unit", "test")) + validateSent(t, ce, events.DeleteEventRefType) +} diff --git a/pkg/adapter/apiserver/resource_test.go b/pkg/adapter/apiserver/resource_test.go new file mode 100644 index 00000000000..363d035a09f --- /dev/null +++ b/pkg/adapter/apiserver/resource_test.go @@ -0,0 +1,48 @@ +package apiserver + +import ( + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "k8s.io/apimachinery/pkg/runtime/schema" + "testing" +) + +func TestResourceAddEvent(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.addEvent(simplePod("unit", "test")) + validateSent(t, ce, events.AddEventType) +} + +func TestResourceUpdateEvent(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.updateEvent(nil, simplePod("unit", "test")) + validateSent(t, ce, events.UpdateEventType) +} + +func TestResourceDeleteEvent(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.deleteEvent(simplePod("unit", "test")) + validateSent(t, ce, events.DeleteEventType) +} + +func TestResourceAddEventNil(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.addEvent(nil) + validateNotSent(t, ce, events.AddEventType) +} + +func TestResourceUpdateEventNil(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.updateEvent(nil, nil) + validateNotSent(t, ce, events.UpdateEventType) +} + +func TestResourceDeleteEventNil(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.deleteEvent(nil) + validateNotSent(t, ce, events.DeleteEventType) +} + +func TestResourceCoverageHacks(t *testing.T) { + d, _ := makeResourceAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{}) // for coverage. +} diff --git a/pkg/kncloudevents/testing/test_client.go b/pkg/kncloudevents/testing/test_client.go new file mode 100644 index 00000000000..97c3422ea14 --- /dev/null +++ b/pkg/kncloudevents/testing/test_client.go @@ -0,0 +1,37 @@ +package testing + +import ( + "context" + cloudevents "github.com/cloudevents/sdk-go" +) + +type TestCloudEventsClient struct { + Sent []cloudevents.Event +} + +var _ cloudevents.Client = (*TestCloudEventsClient)(nil) + +func (c *TestCloudEventsClient) Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { + // TODO: improve later. + c.Sent = append(c.Sent, event) + return nil, nil +} + +func (c *TestCloudEventsClient) StartReceiver(ctx context.Context, fn interface{}) error { + // TODO: improve later. + <-ctx.Done() + return nil +} + +func (c *TestCloudEventsClient) Reset() { + c.Sent = make([]cloudevents.Event, 0) +} + +func NewTestClient() *TestCloudEventsClient { + + c := &TestCloudEventsClient{ + Sent: make([]cloudevents.Event, 0), + } + + return c +} From efc01aa749c1a0314ed5f9aa86e425731487681f Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 8 May 2019 10:56:11 -0700 Subject: [PATCH 12/16] add adapter tests. --- pkg/adapter/apiserver/adapter.go | 7 +- pkg/adapter/apiserver/adapter_test.go | 183 +++++++++++++++++++++++++- 2 files changed, 182 insertions(+), 8 deletions(-) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 4d74977e11f..d3d2cd05646 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -122,7 +122,7 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { } default: - a.logger.Fatal("mode not understood", a.mode) + return fmt.Errorf("mode %q not understood", a.mode) } for _, gvrc := range a.gvrcs { @@ -142,11 +142,6 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { return fmt.Errorf("failed starting shared index informer for %s on namespace %s", gvrc.GVR.String(), a.namespace) } - // Double check the delegate is not nil. - if d == nil { - continue - } - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: d.addEvent, UpdateFunc: d.updateEvent, diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 2233d0d46db..9ca612616d0 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -17,12 +17,191 @@ limitations under the License. package apiserver import ( + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/runtime/schema" + "testing" + "time" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" + kncetesting "github.com/knative/eventing/pkg/kncloudevents/testing" + rectesting "github.com/knative/eventing/pkg/reconciler/testing" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "testing" ) +func TestNewAdaptor(t *testing.T) { + ce := kncetesting.NewTestClient() + logger := zap.NewExample().Sugar() + k8s := makeDynamicClient(nil) + + testCases := map[string]struct { + opt Options + source string + + wantMode string + wantNamespace string + wantGVRCs []GVRC + }{ + "empty opts": { + opt: Options{}, + wantMode: RefMode, + }, + "with source": { + source: "test-source", + opt: Options{}, + wantMode: RefMode, + }, + "with namespace": { + source: "test-source", + opt: Options{ + Namespace: "test-ns", + }, + wantMode: RefMode, + wantNamespace: "test-ns", + }, + "with mode resource": { + source: "test-source", + opt: Options{ + Mode: ResourceMode, + }, + wantMode: ResourceMode, + }, + "with mode ref": { + source: "test-source", + opt: Options{ + Mode: RefMode, + }, + wantMode: RefMode, + }, + "with mode trash": { + source: "test-source", + opt: Options{ + Mode: "trash", + }, + wantMode: RefMode, + }, + "with mode gvrs": { + source: "test-source", + opt: Options{ + GVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }, + Controller: true, + }}, + }, + wantMode: RefMode, + wantGVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }, + Controller: true, + }}, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt) + + got, ok := a.(*adapter) + if !ok { + t.Errorf("expected NewAdapter to return a *adapter, but did not") + } + if diff := cmp.Diff(tc.source, got.source); diff != "" { + t.Errorf("unexpected source diff (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.wantMode, got.mode); diff != "" { + t.Errorf("unexpected mode diff (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.wantNamespace, got.namespace); diff != "" { + t.Errorf("unexpected namespace diff (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.wantGVRCs, got.gvrcs); diff != "" { + t.Errorf("unexpected namespace diff (-want, +got) = %v", diff) + } + }) + } +} + +func TestAdapter_StartRef(t *testing.T) { + ce := kncetesting.NewTestClient() + logger := zap.NewExample().Sugar() + k8s := makeDynamicClient(nil) + source := "test-source" + opt := Options{ + Mode: RefMode, + Namespace: "default", + GVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + }}, + } + + a := NewAdaptor(source, k8s, ce, logger, opt) + + var err error + stopCh := make(chan struct{}) + go func() { + err = a.Start(stopCh) + }() + time.Sleep(1 * time.Millisecond) + stopCh <- struct{}{} + if err != nil { + t.Errorf("did not expect an error, but got %v", err) + } +} + +func TestAdapter_StartResource(t *testing.T) { + ce := kncetesting.NewTestClient() + logger := zap.NewExample().Sugar() + k8s := makeDynamicClient(nil) + source := "test-source" + opt := Options{ + Mode: ResourceMode, + Namespace: "default", + GVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + }}, + } + + a := NewAdaptor(source, k8s, ce, logger, opt) + + var err error + stopCh := make(chan struct{}) + go func() { + err = a.Start(stopCh) + }() + time.Sleep(1 * time.Millisecond) + stopCh <- struct{}{} + if err != nil { + t.Errorf("did not expect an error, but got %v", err) + } +} + +// Common methods: + +// GetDynamicClient returns the mockDynamicClient to use for this test case. +func makeDynamicClient(objects []runtime.Object) dynamic.Interface { + dynamicMocks := rectesting.DynamicMocks{} // TODO: maybe we need to customize this. + realInterface := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), objects...) + return rectesting.NewMockDynamicInterface(realInterface, dynamicMocks) +} + func simplePod(name, namespace string) *unstructured.Unstructured { return &unstructured.Unstructured{ Object: map[string]interface{}{ From cf73f39a7da0d9273f08c4bb00587d2177246542 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 8 May 2019 17:20:20 -0700 Subject: [PATCH 13/16] trying to fix the tests but the cache is not playing with the fakes nicely. --- cmd/apiserver_receive_adapter/main.go | 2 +- pkg/adapter/apiserver/adapter_test.go | 35 +++++++++++++++++++-------- pkg/adapter/apiserver/ref.go | 2 +- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index f4552c6ca76..69553fa05b2 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -94,7 +94,7 @@ func main() { if err != nil { logger.Fatalw("Error parsing APIVersion", zap.Error(err)) } - // This is really bad. + // TODO: pass down the resource and the kind so we do not have to guess. gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}) gvrcs = append(gvrcs, apiserver.GVRC{ GVR: gvr, diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 9ca612616d0..85beb1eb63d 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -17,18 +17,19 @@ limitations under the License. package apiserver import ( - "github.com/google/go-cmp/cmp" - "k8s.io/apimachinery/pkg/runtime/schema" - "testing" - "time" - + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" dynamicfake "k8s.io/client-go/dynamic/fake" + rt "runtime" + "testing" + "github.com/google/go-cmp/cmp" kncetesting "github.com/knative/eventing/pkg/kncloudevents/testing" rectesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/pkg/errors" "go.uber.org/zap" ) @@ -132,6 +133,9 @@ func TestNewAdaptor(t *testing.T) { } func TestAdapter_StartRef(t *testing.T) { + // TODO: fix this test, the cache informer will now sync. + t.Skip() + ce := kncetesting.NewTestClient() logger := zap.NewExample().Sugar() k8s := makeDynamicClient(nil) @@ -150,12 +154,15 @@ func TestAdapter_StartRef(t *testing.T) { a := NewAdaptor(source, k8s, ce, logger, opt) - var err error + err := errors.New("test never ran") stopCh := make(chan struct{}) go func() { err = a.Start(stopCh) }() - time.Sleep(1 * time.Millisecond) + + // Let the cache informer sync. + rt.Gosched() + stopCh <- struct{}{} if err != nil { t.Errorf("did not expect an error, but got %v", err) @@ -163,6 +170,9 @@ func TestAdapter_StartRef(t *testing.T) { } func TestAdapter_StartResource(t *testing.T) { + // TODO: fix this test, the cache informer will now sync. + t.Skip() + ce := kncetesting.NewTestClient() logger := zap.NewExample().Sugar() k8s := makeDynamicClient(nil) @@ -181,12 +191,15 @@ func TestAdapter_StartResource(t *testing.T) { a := NewAdaptor(source, k8s, ce, logger, opt) - var err error + err := errors.New("test never ran") stopCh := make(chan struct{}) go func() { err = a.Start(stopCh) }() - time.Sleep(1 * time.Millisecond) + + // Let the cache informer sync. + rt.Gosched() + stopCh <- struct{}{} if err != nil { t.Errorf("did not expect an error, but got %v", err) @@ -197,8 +210,10 @@ func TestAdapter_StartResource(t *testing.T) { // GetDynamicClient returns the mockDynamicClient to use for this test case. func makeDynamicClient(objects []runtime.Object) dynamic.Interface { + sc := runtime.NewScheme() + _ = corev1.AddToScheme(sc) dynamicMocks := rectesting.DynamicMocks{} // TODO: maybe we need to customize this. - realInterface := dynamicfake.NewSimpleDynamicClient(runtime.NewScheme(), objects...) + realInterface := dynamicfake.NewSimpleDynamicClient(sc, objects...) return rectesting.NewMockDynamicInterface(realInterface, dynamicMocks) } diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index 19c2c3a8f86..8e28edc715d 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -44,7 +44,7 @@ func (a *ref) asController(obj interface{}) bool { if len(a.controlledGVRs) > 0 { object := obj.(*unstructured.Unstructured) gvk := object.GroupVersionKind() - // This is really bad. + // TODO: pass down the resource and the kind so we do not have to guess. gvr, _ := meta.UnsafeGuessKindToResource(gvk) for _, gvrc := range a.controlledGVRs { if reflect.DeepEqual(gvr, gvrc) { From 21210d5b1ea955d103a188bfc530c52284076649 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Thu, 9 May 2019 07:28:43 -0700 Subject: [PATCH 14/16] update lock. --- Gopkg.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Gopkg.lock b/Gopkg.lock index d6e6c86cf14..0f82ef17839 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1407,6 +1407,7 @@ "github.com/knative/test-infra/tools/dep-collector", "github.com/nats-io/go-nats-streaming", "github.com/nats-io/nats-streaming-server/server", + "github.com/pkg/errors", "github.com/robfig/cron", "go.opencensus.io/exporter/prometheus", "go.opencensus.io/stats", From 19c0ca9023eaa98da09782c4b5a41c6154879065 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Thu, 9 May 2019 08:07:27 -0700 Subject: [PATCH 15/16] Use cache reflector. --- config/300-apiserversource.yaml | 2 +- pkg/adapter/apiserver/adapter.go | 24 ++------- pkg/adapter/apiserver/adapter_test.go | 23 +++----- pkg/adapter/apiserver/events/events.go | 37 ++++--------- pkg/adapter/apiserver/events/events_test.go | 52 ++++-------------- pkg/adapter/apiserver/ref.go | 58 ++++++++++++++++++--- pkg/adapter/apiserver/ref_test.go | 30 +++++++---- pkg/adapter/apiserver/resource.go | 58 ++++++++++++++++++--- pkg/adapter/apiserver/resource_test.go | 24 ++++++--- 9 files changed, 173 insertions(+), 135 deletions(-) diff --git a/config/300-apiserversource.yaml b/config/300-apiserversource.yaml index 44b051ca3bc..1312561f33a 100644 --- a/config/300-apiserversource.yaml +++ b/config/300-apiserversource.yaml @@ -52,7 +52,7 @@ spec: description: "A reference to the object that should receive events." mode: type: string - description: "Mode controls the content of the event payload. One of: 'Ref' (only references of resources), 'Resource' (full resource as ResourceEvent)." + description: "Mode controls the content of the event payload. One of: 'Ref' (only references of resources), 'Resource' (full resource)." resources: items: properties: diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index d3d2cd05646..d91985a09a7 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -92,10 +92,7 @@ func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents } type eventDelegate interface { - addEvent(obj interface{}) - updateEvent(oldObj, newObj interface{}) - deleteEvent(obj interface{}) - + cache.Store addControllerWatch(gvr schema.GroupVersionResource) } @@ -126,30 +123,17 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { } for _, gvrc := range a.gvrcs { - var informer cache.SharedIndexInformer - lw := &cache.ListWatch{ ListFunc: asUnstructuredLister(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).List), WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).Watch), } - informer = cache.NewSharedIndexInformer(lw, &unstructured.Unstructured{}, resyncPeriod, cache.Indexers{ - cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, - }) - - go informer.Run(stopCh) - - if ok := cache.WaitForCacheSync(stopCh, informer.HasSynced); !ok { - return fmt.Errorf("failed starting shared index informer for %s on namespace %s", gvrc.GVR.String(), a.namespace) - } - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: d.addEvent, - UpdateFunc: d.updateEvent, - DeleteFunc: d.deleteEvent, - }) if gvrc.Controller { d.addControllerWatch(gvrc.GVR) } + + reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, d, resyncPeriod) + go reflector.Run(stop) } <-stopCh diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 85beb1eb63d..8d091df411d 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -17,6 +17,11 @@ limitations under the License. package apiserver import ( + "github.com/google/go-cmp/cmp" + kncetesting "github.com/knative/eventing/pkg/kncloudevents/testing" + rectesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/pkg/errors" + "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -25,12 +30,6 @@ import ( dynamicfake "k8s.io/client-go/dynamic/fake" rt "runtime" "testing" - - "github.com/google/go-cmp/cmp" - kncetesting "github.com/knative/eventing/pkg/kncloudevents/testing" - rectesting "github.com/knative/eventing/pkg/reconciler/testing" - "github.com/pkg/errors" - "go.uber.org/zap" ) func TestNewAdaptor(t *testing.T) { @@ -133,9 +132,6 @@ func TestNewAdaptor(t *testing.T) { } func TestAdapter_StartRef(t *testing.T) { - // TODO: fix this test, the cache informer will now sync. - t.Skip() - ce := kncetesting.NewTestClient() logger := zap.NewExample().Sugar() k8s := makeDynamicClient(nil) @@ -160,19 +156,15 @@ func TestAdapter_StartRef(t *testing.T) { err = a.Start(stopCh) }() - // Let the cache informer sync. + stopCh <- struct{}{} rt.Gosched() - stopCh <- struct{}{} if err != nil { t.Errorf("did not expect an error, but got %v", err) } } func TestAdapter_StartResource(t *testing.T) { - // TODO: fix this test, the cache informer will now sync. - t.Skip() - ce := kncetesting.NewTestClient() logger := zap.NewExample().Sugar() k8s := makeDynamicClient(nil) @@ -197,10 +189,9 @@ func TestAdapter_StartResource(t *testing.T) { err = a.Start(stopCh) }() - // Let the cache informer sync. + stopCh <- struct{}{} rt.Gosched() - stopCh <- struct{}{} if err != nil { t.Errorf("did not expect an error, but got %v", err) } diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go index 8508a889716..5d6ec76eb6c 100644 --- a/pkg/adapter/apiserver/events/events.go +++ b/pkg/adapter/apiserver/events/events.go @@ -37,37 +37,22 @@ const ( DeleteEventRefType = "dev.knative.apiserver.ref.delete" ) -type ResourceEvent struct { - Object *unstructured.Unstructured `json:"obj,omitempty"` - NewObject *unstructured.Unstructured `json:"newObj,omitempty"` - OldObject *unstructured.Unstructured `json:"oldObj,omitempty"` -} - func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { if obj == nil { return nil, fmt.Errorf("resource can not be nil") } object := obj.(*unstructured.Unstructured) - return makeEvent(source, AddEventType, object, &ResourceEvent{ - Object: object, - }) + return makeEvent(source, AddEventType, object, object) } -func MakeUpdateEvent(source string, oldObj, newObj interface{}) (*cloudevents.Event, error) { - if newObj == nil { - return nil, fmt.Errorf("new resource can not be nil") - } - object := newObj.(*unstructured.Unstructured) - - data := &ResourceEvent{ - NewObject: object, - } - if oldObj != nil { - data.OldObject = oldObj.(*unstructured.Unstructured) +func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") } + object := obj.(*unstructured.Unstructured) - return makeEvent(source, UpdateEventType, object, data) + return makeEvent(source, UpdateEventType, object, object) } func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) { @@ -76,9 +61,7 @@ func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) } object := obj.(*unstructured.Unstructured) - return makeEvent(source, DeleteEventType, object, &ResourceEvent{ - Object: object, - }) + return makeEvent(source, DeleteEventType, object, object) } func getRef(object *unstructured.Unstructured, asController bool) corev1.ObjectReference { @@ -108,11 +91,11 @@ func MakeAddRefEvent(source string, asController bool, obj interface{}) (*cloude return makeEvent(source, AddEventRefType, object, getRef(object, asController)) } -func MakeUpdateRefEvent(source string, asController bool, oldObj, newObj interface{}) (*cloudevents.Event, error) { - if newObj == nil { +func MakeUpdateRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { return nil, fmt.Errorf("new resource can not be nil") } - object := newObj.(*unstructured.Unstructured) + object := obj.(*unstructured.Unstructured) return makeEvent(source, UpdateEventRefType, object, getRef(object, asController)) } diff --git a/pkg/adapter/apiserver/events/events_test.go b/pkg/adapter/apiserver/events/events_test.go index c255bad8fd9..1740216ad7a 100644 --- a/pkg/adapter/apiserver/events/events_test.go +++ b/pkg/adapter/apiserver/events/events_test.go @@ -89,7 +89,7 @@ func TestMakeAddEvent(t *testing.T) { }, }.AsV02(), }, - wantData: `{"obj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, + wantData: `{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}`, }, } for n, tc := range testCases { @@ -102,8 +102,7 @@ func TestMakeAddEvent(t *testing.T) { func TestMakeUpdateEvent(t *testing.T) { testCases := map[string]struct { - oldObj interface{} - newObj interface{} + obj interface{} source string want *cloudevents.Event @@ -117,22 +116,7 @@ func TestMakeUpdateEvent(t *testing.T) { }, "simple pod": { source: "unit-test", - oldObj: simplePod("unit", "test"), - newObj: simplePod("unit", "test"), - want: &cloudevents.Event{ - Context: cloudevents.EventContextV02{ - Type: "dev.knative.apiserver.resource.update", - Source: *cloudevents.ParseURLRef("unit-test"), - Extensions: map[string]interface{}{ - "subject": "/apis/v1/namespaces/test/pods/unit", - }, - }.AsV02(), - }, - wantData: `{"newObj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}},"oldObj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, - }, - "nil old": { - source: "unit-test", - newObj: simplePod("unit", "test"), + obj: simplePod("unit", "test"), want: &cloudevents.Event{ Context: cloudevents.EventContextV02{ Type: "dev.knative.apiserver.resource.update", @@ -142,12 +126,12 @@ func TestMakeUpdateEvent(t *testing.T) { }, }.AsV02(), }, - wantData: `{"newObj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, + wantData: `{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}`, }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - got, err := events.MakeUpdateEvent(tc.source, tc.oldObj, tc.newObj) + got, err := events.MakeUpdateEvent(tc.source, tc.obj) validate(t, got, err, tc.want, tc.wantData, tc.wantErr) }) } @@ -179,7 +163,7 @@ func TestMakeDeleteEvent(t *testing.T) { }, }.AsV02(), }, - wantData: `{"obj":{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}}`, + wantData: `{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}`, }, } for n, tc := range testCases { @@ -245,8 +229,7 @@ func TestMakeAddRefEvent(t *testing.T) { func TestMakeUpdateRefEvent(t *testing.T) { testCases := map[string]struct { - oldObj interface{} - newObj interface{} + obj interface{} source string asController bool @@ -261,8 +244,7 @@ func TestMakeUpdateRefEvent(t *testing.T) { }, "simple pod": { source: "unit-test", - oldObj: simplePod("unit", "test"), - newObj: simplePod("unit", "test"), + obj: simplePod("unit", "test"), want: &cloudevents.Event{ Context: cloudevents.EventContextV02{ Type: "dev.knative.apiserver.ref.update", @@ -276,7 +258,7 @@ func TestMakeUpdateRefEvent(t *testing.T) { }, "simple owned pod": { source: "unit-test", - newObj: simpleOwnedPod("unit", "test"), + obj: simpleOwnedPod("unit", "test"), asController: true, want: &cloudevents.Event{ Context: cloudevents.EventContextV02{ @@ -289,24 +271,10 @@ func TestMakeUpdateRefEvent(t *testing.T) { }, wantData: `{"kind":"ReplicaSet","namespace":"test","name":"unit","apiVersion":"apps/v1"}`, }, - "nil old": { - source: "unit-test", - newObj: simplePod("unit", "test"), - want: &cloudevents.Event{ - Context: cloudevents.EventContextV02{ - Type: "dev.knative.apiserver.ref.update", - Source: *cloudevents.ParseURLRef("unit-test"), - Extensions: map[string]interface{}{ - "subject": "/apis/v1/namespaces/test/pods/unit", - }, - }.AsV02(), - }, - wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, - }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - got, err := events.MakeUpdateRefEvent(tc.source, tc.asController, tc.oldObj, tc.newObj) + got, err := events.MakeUpdateRefEvent(tc.source, tc.asController, tc.obj) validate(t, got, err, tc.want, tc.wantData, tc.wantErr) }) } diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index 8e28edc715d..f0f25dfd1bb 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -21,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" "reflect" cloudevents "github.com/cloudevents/sdk-go" @@ -36,6 +37,8 @@ type ref struct { controlledGVRs []schema.GroupVersionResource } +var _ cache.Store = (*ref)(nil) + // TODO: I think asController is not the feature we want. I think we want to be // able to set the controller as a filter to the watch. Not emit all owners of // the resource. Fix this. It has to be an api change on the CRD. @@ -55,40 +58,49 @@ func (a *ref) asController(obj interface{}) bool { return false } -func (a *ref) addEvent(obj interface{}) { +// Implements cache.Store +func (a *ref) Add(obj interface{}) error { event, err := events.MakeAddRefEvent(a.source, a.asController(obj), obj) if err != nil { a.logger.Info("event creation failed", zap.Error(err)) - return + return err } if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + return err } + return nil } -func (a *ref) updateEvent(oldObj, newObj interface{}) { - event, err := events.MakeUpdateRefEvent(a.source, a.asController(newObj), oldObj, newObj) +// Implements cache.Store +func (a *ref) Update(obj interface{}) error { + event, err := events.MakeUpdateRefEvent(a.source, a.asController(obj), obj) if err != nil { a.logger.Info("event creation failed", zap.Error(err)) - return + return err } if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + return err } + return nil } -func (a *ref) deleteEvent(obj interface{}) { +// Implements cache.Store +func (a *ref) Delete(obj interface{}) error { event, err := events.MakeDeleteRefEvent(a.source, a.asController(obj), obj) if err != nil { a.logger.Info("event creation failed", zap.Error(err)) - return + return err } if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + return err } + return nil } func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { @@ -98,3 +110,35 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { } a.controlledGVRs = append(a.controlledGVRs, gvr) } + +// Stub cache.Store impl + +// Implements cache.Store +func (a *ref) List() []interface{} { + return nil +} + +// Implements cache.Store +func (a *ref) ListKeys() []string { + return nil +} + +// Implements cache.Store +func (a *ref) Get(obj interface{}) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *ref) GetByKey(key string) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *ref) Replace([]interface{}, string) error { + return nil +} + +// Implements cache.Store +func (a *ref) Resync() error { + return nil +} diff --git a/pkg/adapter/apiserver/ref_test.go b/pkg/adapter/apiserver/ref_test.go index af96ceb4829..d70e1fa5ecb 100644 --- a/pkg/adapter/apiserver/ref_test.go +++ b/pkg/adapter/apiserver/ref_test.go @@ -8,37 +8,37 @@ import ( func TestRefAddEvent(t *testing.T) { d, ce := makeRefAndTestingClient() - d.addEvent(simplePod("unit", "test")) + d.Add(simplePod("unit", "test")) validateSent(t, ce, events.AddEventRefType) } func TestRefUpdateEvent(t *testing.T) { d, ce := makeRefAndTestingClient() - d.updateEvent(nil, simplePod("unit", "test")) + d.Update(simplePod("unit", "test")) validateSent(t, ce, events.UpdateEventRefType) } func TestRefDeleteEvent(t *testing.T) { d, ce := makeRefAndTestingClient() - d.deleteEvent(simplePod("unit", "test")) + d.Delete(simplePod("unit", "test")) validateSent(t, ce, events.DeleteEventRefType) } func TestRefAddEventNil(t *testing.T) { d, ce := makeRefAndTestingClient() - d.addEvent(nil) + d.Add(nil) validateNotSent(t, ce, events.AddEventRefType) } func TestRefUpdateEventNil(t *testing.T) { d, ce := makeRefAndTestingClient() - d.updateEvent(nil, nil) + d.Update(nil) validateNotSent(t, ce, events.UpdateEventRefType) } func TestRefDeleteEventNil(t *testing.T) { d, ce := makeRefAndTestingClient() - d.deleteEvent(nil) + d.Delete(nil) validateNotSent(t, ce, events.DeleteEventRefType) } @@ -49,7 +49,7 @@ func TestRefAddEventAsController(t *testing.T) { Version: "v1", Resource: "pods", }) - d.addEvent(simpleOwnedPod("unit", "test")) + d.Add(simpleOwnedPod("unit", "test")) validateSent(t, ce, events.AddEventRefType) } @@ -60,7 +60,7 @@ func TestRefUpdateEventAsController(t *testing.T) { Version: "v1", Resource: "pods", }) - d.updateEvent(nil, simpleOwnedPod("unit", "test")) + d.Update(simpleOwnedPod("unit", "test")) validateSent(t, ce, events.UpdateEventRefType) } @@ -71,6 +71,18 @@ func TestRefDeleteEventAsController(t *testing.T) { Version: "v1", Resource: "pods", }) - d.deleteEvent(simpleOwnedPod("unit", "test")) + d.Delete(simpleOwnedPod("unit", "test")) validateSent(t, ce, events.DeleteEventRefType) } + +// HACKHACKHACK For test coverage. +func TestRefStub(t *testing.T) { + d, _ := makeRefAndTestingClient() + + d.List() + d.ListKeys() + d.Get(nil) + d.GetByKey("") + d.Replace(nil, "") + d.Resync() +} diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index 724525e45ef..6eef0bbf978 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -22,6 +22,7 @@ import ( "github.com/knative/eventing/pkg/adapter/apiserver/events" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" ) type resource struct { @@ -30,43 +31,86 @@ type resource struct { logger *zap.SugaredLogger } -func (a *resource) addEvent(obj interface{}) { +var _ cache.Store = (*resource)(nil) + +func (a *resource) Add(obj interface{}) error { event, err := events.MakeAddEvent(a.source, obj) if err != nil { a.logger.Info("event creation failed", zap.Error(err)) - return + return err } if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + return err } + + return nil } -func (a *resource) updateEvent(oldObj, newObj interface{}) { - event, err := events.MakeUpdateEvent(a.source, oldObj, newObj) +func (a *resource) Update(obj interface{}) error { + event, err := events.MakeUpdateEvent(a.source, obj) if err != nil { a.logger.Info("event creation failed", zap.Error(err)) - return + return err } if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + return err } + + return nil } -func (a *resource) deleteEvent(obj interface{}) { +func (a *resource) Delete(obj interface{}) error { event, err := events.MakeDeleteEvent(a.source, obj) if err != nil { a.logger.Info("event creation failed", zap.Error(err)) - return + return err } if _, err := a.ce.Send(context.Background(), *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + return err } + + return nil } func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { // not supported for resource. a.logger.Warn("ignored controller watch request on gvr.", zap.String("gvr", gvr.String())) } + +// Stub cache.Store impl + +// Implements cache.Store +func (a *resource) List() []interface{} { + return nil +} + +// Implements cache.Store +func (a *resource) ListKeys() []string { + return nil +} + +// Implements cache.Store +func (a *resource) Get(obj interface{}) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *resource) GetByKey(key string) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *resource) Replace([]interface{}, string) error { + return nil +} + +// Implements cache.Store +func (a *resource) Resync() error { + return nil +} diff --git a/pkg/adapter/apiserver/resource_test.go b/pkg/adapter/apiserver/resource_test.go index 363d035a09f..bcab080bb38 100644 --- a/pkg/adapter/apiserver/resource_test.go +++ b/pkg/adapter/apiserver/resource_test.go @@ -8,37 +8,37 @@ import ( func TestResourceAddEvent(t *testing.T) { d, ce := makeResourceAndTestingClient() - d.addEvent(simplePod("unit", "test")) + d.Add(simplePod("unit", "test")) validateSent(t, ce, events.AddEventType) } func TestResourceUpdateEvent(t *testing.T) { d, ce := makeResourceAndTestingClient() - d.updateEvent(nil, simplePod("unit", "test")) + d.Update(simplePod("unit", "test")) validateSent(t, ce, events.UpdateEventType) } func TestResourceDeleteEvent(t *testing.T) { d, ce := makeResourceAndTestingClient() - d.deleteEvent(simplePod("unit", "test")) + d.Delete(simplePod("unit", "test")) validateSent(t, ce, events.DeleteEventType) } func TestResourceAddEventNil(t *testing.T) { d, ce := makeResourceAndTestingClient() - d.addEvent(nil) + d.Add(nil) validateNotSent(t, ce, events.AddEventType) } func TestResourceUpdateEventNil(t *testing.T) { d, ce := makeResourceAndTestingClient() - d.updateEvent(nil, nil) + d.Update(nil) validateNotSent(t, ce, events.UpdateEventType) } func TestResourceDeleteEventNil(t *testing.T) { d, ce := makeResourceAndTestingClient() - d.deleteEvent(nil) + d.Delete(nil) validateNotSent(t, ce, events.DeleteEventType) } @@ -46,3 +46,15 @@ func TestResourceCoverageHacks(t *testing.T) { d, _ := makeResourceAndTestingClient() d.addControllerWatch(schema.GroupVersionResource{}) // for coverage. } + +// HACKHACKHACK For test coverage. +func TestResourceStub(t *testing.T) { + d, _ := makeResourceAndTestingClient() + + d.List() + d.ListKeys() + d.Get(nil) + d.GetByKey("") + d.Replace(nil, "") + d.Resync() +} From 91da4627559441d00122ba5971e48783deb72ecc Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Thu, 9 May 2019 08:19:47 -0700 Subject: [PATCH 16/16] fix race. --- pkg/adapter/apiserver/adapter_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 8d091df411d..95bb8578158 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -28,7 +28,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" dynamicfake "k8s.io/client-go/dynamic/fake" - rt "runtime" "testing" ) @@ -152,12 +151,14 @@ func TestAdapter_StartRef(t *testing.T) { err := errors.New("test never ran") stopCh := make(chan struct{}) + done := make(chan struct{}) go func() { err = a.Start(stopCh) + done <- struct{}{} }() stopCh <- struct{}{} - rt.Gosched() + <-done if err != nil { t.Errorf("did not expect an error, but got %v", err) @@ -185,12 +186,14 @@ func TestAdapter_StartResource(t *testing.T) { err := errors.New("test never ran") stopCh := make(chan struct{}) + done := make(chan struct{}) go func() { err = a.Start(stopCh) + done <- struct{}{} }() stopCh <- struct{}{} - rt.Gosched() + <-done if err != nil { t.Errorf("did not expect an error, but got %v", err)