diff --git a/Gopkg.lock b/Gopkg.lock index 20a6e81ce8b..0f82ef17839 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1379,11 +1379,7 @@ "github.com/Shopify/sarama", "github.com/bsm/sarama-cluster", "github.com/cloudevents/sdk-go", - "github.com/cloudevents/sdk-go/pkg/cloudevents", - "github.com/cloudevents/sdk-go/pkg/cloudevents/client", - "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", - "github.com/cloudevents/sdk-go/pkg/cloudevents/types", "github.com/google/go-cmp/cmp", "github.com/google/go-cmp/cmp/cmpopts", "github.com/google/uuid", @@ -1411,6 +1407,7 @@ "github.com/knative/test-infra/tools/dep-collector", "github.com/nats-io/go-nats-streaming", "github.com/nats-io/nats-streaming-server/server", + "github.com/pkg/errors", "github.com/robfig/cron", "go.opencensus.io/exporter/prometheus", "go.opencensus.io/stats", diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index e7bc7004eed..69553fa05b2 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -18,21 +18,12 @@ package main import ( "flag" - "time" - - "k8s.io/client-go/rest" - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "github.com/knative/eventing/pkg/reconciler" - "github.com/kelseyhightower/envconfig" "github.com/knative/eventing/pkg/adapter/apiserver" "github.com/knative/eventing/pkg/kncloudevents" - "github.com/knative/pkg/apis/duck" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - kncontroller "github.com/knative/pkg/controller" "github.com/knative/pkg/signals" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -48,12 +39,16 @@ var ( ) type envConfig struct { + Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"` + Mode string `envconfig:"MODE"` SinkURI string `split_words:"true" required:"true"` ApiVersion []string `split_words:"true" required:"true"` Kind []string `required:"true"` Controller []bool `required:"true"` } +// TODO: the controller should take the list of GVR + func main() { flag.Parse() @@ -79,11 +74,6 @@ func main() { logger = logger.With(zap.String("controller/apiserver", "adapter")) logger.Info("Starting the controller") - numControllers := len(env.ApiVersion) - cfg.QPS = float32(numControllers) * rest.DefaultQPS - cfg.Burst = numControllers * rest.DefaultBurst - opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh) - client, err := dynamic.NewForConfig(cfg) if err != nil { logger.Fatalw("Error building dynamic client", zap.Error(err)) @@ -94,43 +84,33 @@ func main() { logger.Fatalw("Error building cloud event client", zap.Error(err)) } - controllers := []*kncontroller.Impl{} + gvrcs := []apiserver.GVRC(nil) - // Create one controller per resource. for i, apiVersion := range env.ApiVersion { kind := env.Kind[i] controlled := env.Controller[i] - obj := &duckv1alpha1.AddressableType{} - - factory := duck.TypedInformerFactory{ - Client: client, - ResyncPeriod: time.Duration(10), // TODO - StopChannel: stopCh, - Type: obj, - } - gv, err := schema.ParseGroupVersion(apiVersion) if err != nil { logger.Fatalw("Error parsing APIVersion", zap.Error(err)) } + // TODO: pass down the resource and the kind so we do not have to guess. + gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}) + gvrcs = append(gvrcs, apiserver.GVRC{ + GVR: gvr, + Controller: controlled, + }) + } - gvk := schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version} - - // This is really bad. - gvr, _ := meta.UnsafeGuessKindToResource(gvk) - - // Get and start the informer for gvr - logger.Infof("Starting informer for %v", gvk) - informer, lister, err := factory.Get(gvr) - if err != nil { - logger.Fatalw("Error starting informer", zap.Error(err)) - } - controllers = append(controllers, apiserver.NewController(opt, informer, lister, eventsClient, controlled)) + opt := apiserver.Options{ + Namespace: env.Namespace, + Mode: env.Mode, + GVRCs: gvrcs, } - // Start all of the controllers. - logger.Info("Starting controllers.") - go kncontroller.StartAll(stopCh, controllers...) - <-stopCh + a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt) + logger.Info("starting kubernetes api adapter") + if err := a.Start(stopCh); err != nil { + logger.Warn("start returned an error,", zap.Error(err)) + } } diff --git a/config/300-apiserversource.yaml b/config/300-apiserversource.yaml index 1d1ac990297..1312561f33a 100644 --- a/config/300-apiserversource.yaml +++ b/config/300-apiserversource.yaml @@ -50,6 +50,9 @@ spec: sink: type: object description: "A reference to the object that should receive events." + mode: + type: string + description: "Mode controls the content of the event payload. One of: 'Ref' (only references of resources), 'Resource' (full resource)." resources: items: properties: @@ -61,7 +64,7 @@ spec: description: "Kind of the objects to watch." controller: type: boolean - description: "If true, watch the managing controller. More info: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/" + description: "If true, emits the managing controller ref. Only supported for mode=Ref. More info: https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/" type: array required: - resources diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index db14f450645..d91985a09a7 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -17,113 +17,144 @@ limitations under the License. package apiserver import ( - "context" - - "github.com/cloudevents/sdk-go/pkg/cloudevents" - eventsclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client" - "github.com/cloudevents/sdk-go/pkg/cloudevents/types" - "github.com/knative/eventing/pkg/reconciler" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - "github.com/knative/pkg/controller" - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - apierrs "k8s.io/apimachinery/pkg/api/errors" + "fmt" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" + + cloudevents "github.com/cloudevents/sdk-go" + "go.uber.org/zap" ) -const ( - // ReconcilerName is the name of the reconciler - ReconcilerName = "ApiServerSource" +type Adapter interface { + Start(stopCh <-chan struct{}) error +} - controllerAgentName = "apiserver-source-adapter-controller" - updateEventType = "dev.knative.apiserver.object.update" - deleteEventType = "dev.knative.apiserver.object.delete" +const ( + // RefMode produces payloads of ObjectReference + RefMode = "Ref" + // ResourceMode produces payloads of ResourceEvent + ResourceMode = "Resource" ) -// NewController initializes the controller and is called by the generated code -// Registers event handlers to enqueue events -func NewController( - opt reconciler.Options, - informer cache.SharedInformer, - lister cache.GenericLister, - eventsclient eventsclient.Client, - controlled bool) *controller.Impl { - - r := &Reconciler{ - Base: reconciler.NewBase(opt, controllerAgentName), - lister: lister, - eventsClient: eventsclient, - } - impl := controller.NewImpl(r, r.Logger, ReconcilerName, reconciler.MustNewStatsReporter(ReconcilerName, r.Logger)) - - r.Logger.Info("Setting up event handlers") +// Options hold the options for the Adapter. +type Options struct { + Mode string + Namespace string + GVRCs []GVRC +} - if controlled { - informer.AddEventHandler(reconciler.Handler(impl.EnqueueControllerOf)) - } else { - informer.AddEventHandler(reconciler.Handler(impl.Enqueue)) - } - return impl +// GVRC is a pairing of GroupVersionResource and Controller flag. +type GVRC struct { + GVR schema.GroupVersionResource + Controller bool } -// Reconciler reconciles an ApiServerSource object -type Reconciler struct { - *reconciler.Base +type adapter struct { + gvrcs []GVRC + k8s dynamic.Interface + ce cloudevents.Client + source string + namespace string + logger *zap.SugaredLogger - eventsClient eventsclient.Client - lister cache.GenericLister + mode string + delegate eventDelegate } -// Reconcile sends a cloud event corresponding to the given key -func (r *Reconciler) Reconcile(ctx context.Context, key string) error { - // Convert the namespace/name string into a distinct namespace and name - namespace, name, err := cache.SplitMetaNamespaceKey(key) - if err != nil { - r.Logger.Errorf("invalid resource key: %s", key) - return nil +func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter { + mode := opt.Mode + switch mode { + case ResourceMode, RefMode: + // ok + default: + logger.Warn("unknown mode ", mode) + mode = RefMode + logger.Warn("defaulting mode to ", mode) } - // Get the resource with this namespace/name - original, err := r.lister.ByNamespace(namespace).Get(name) - if apierrs.IsNotFound(err) { - // The resource may no longer exist, in which case we stop processing. - r.Logger.Error("resource key in work queue no longer exists", zap.Any("key", key)) - return nil - } else if err != nil { - return err + a := &adapter{ + k8s: k8sClient, + ce: ceClient, + source: source, + logger: logger, + gvrcs: opt.GVRCs, + namespace: opt.Namespace, + mode: mode, } + return a +} - object := original.(*duckv1alpha1.AddressableType) +type eventDelegate interface { + cache.Store + addControllerWatch(gvr schema.GroupVersionResource) +} - eventType := updateEventType - timestamp := object.GetCreationTimestamp() - if object.GetDeletionTimestamp() != nil { - eventType = deleteEventType - timestamp = *object.GetDeletionTimestamp() +func (a *adapter) Start(stopCh <-chan struct{}) error { + // Local stop channel. + stop := make(chan struct{}) + + resyncPeriod := time.Duration(10 * time.Hour) + + var d eventDelegate + switch a.mode { + case ResourceMode: + d = &resource{ + ce: a.ce, + source: a.source, + logger: a.logger, + } + + case RefMode: + d = &ref{ + ce: a.ce, + source: a.source, + logger: a.logger, + } + + default: + return fmt.Errorf("mode %q not understood", a.mode) } - objectRef := corev1.ObjectReference{ - APIVersion: object.APIVersion, - Kind: object.Kind, - Name: object.GetName(), - Namespace: object.GetNamespace(), - } + for _, gvrc := range a.gvrcs { + lw := &cache.ListWatch{ + ListFunc: asUnstructuredLister(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).List), + WatchFunc: asUnstructuredWatcher(a.k8s.Resource(gvrc.GVR).Namespace(a.namespace).Watch), + } - event := cloudevents.Event{ - Context: cloudevents.EventContextV02{ - ID: string(object.GetUID()), - Type: eventType, - Source: *types.ParseURLRef(object.GetSelfLink()), - Time: &types.Timestamp{Time: timestamp.Time}, - }.AsV02(), - Data: objectRef, + if gvrc.Controller { + d.addControllerWatch(gvrc.GVR) + } + + reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, d, resyncPeriod) + go reflector.Run(stop) } - if _, err := r.eventsClient.Send(ctx, event); err != nil { - r.Logger.Error("failed to send cloudevent (retrying)", err) + <-stopCh + stop <- struct{}{} + return nil +} + +type unstructuredLister func(metav1.ListOptions) (*unstructured.UnstructuredList, error) - return err +func asUnstructuredLister(ulist unstructuredLister) cache.ListFunc { + return func(opts metav1.ListOptions) (runtime.Object, error) { + ul, err := ulist(opts) + if err != nil { + return nil, err + } + return ul, nil } +} - return nil +func asUnstructuredWatcher(wf cache.WatchFunc) cache.WatchFunc { + return func(lo metav1.ListOptions) (watch.Interface, error) { + return wf(lo) + } } diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 171755796d2..95bb8578158 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -17,187 +17,272 @@ limitations under the License. package apiserver import ( - "context" - "io/ioutil" - "net/http" - "net/http/httptest" - gotesting "testing" - "time" - - "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/json" "github.com/google/go-cmp/cmp" - "github.com/knative/eventing/pkg/kncloudevents" - "github.com/knative/eventing/pkg/reconciler" - "github.com/knative/pkg/apis/duck" - duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" - logtesting "github.com/knative/pkg/logging/testing" + kncetesting "github.com/knative/eventing/pkg/kncloudevents/testing" + rectesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/pkg/errors" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - fakedynamicclientset "k8s.io/client-go/dynamic/fake" - fakekubeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/dynamic" + dynamicfake "k8s.io/client-go/dynamic/fake" + "testing" ) -const ( - sourceName = "test-apiserver-adapter" - sourceUID = "1234-5678-90" - testNS = "testnamespace" -) - -type testCase struct { - // Name is a descriptive name for this test suitable as a first argument to t.Run() - Name string - - // InitialState is the list of objects that already exists when reconciliation - // starts. - InitialState []runtime.Object - - // Key is the parameter to reconciliation. - // This has the form "namespace/name". - Key string - - // Where to send events - sink func(http.ResponseWriter, *http.Request) - - // Expected event data - data interface{} -} - -func TestReconcile(t *gotesting.T) { - table := []testCase{ - { - Name: "Receive Pod creation event", - InitialState: []runtime.Object{ - getPod(), +func TestNewAdaptor(t *testing.T) { + ce := kncetesting.NewTestClient() + logger := zap.NewExample().Sugar() + k8s := makeDynamicClient(nil) + + testCases := map[string]struct { + opt Options + source string + + wantMode string + wantNamespace string + wantGVRCs []GVRC + }{ + "empty opts": { + opt: Options{}, + wantMode: RefMode, + }, + "with source": { + source: "test-source", + opt: Options{}, + wantMode: RefMode, + }, + "with namespace": { + source: "test-source", + opt: Options{ + Namespace: "test-ns", }, - Key: testNS + "/" + sourceName, - - sink: sinkAccepted, - data: decode(t, encode(t, getPodRef())), + wantMode: RefMode, + wantNamespace: "test-ns", + }, + "with mode resource": { + source: "test-source", + opt: Options{ + Mode: ResourceMode, + }, + wantMode: ResourceMode, + }, + "with mode ref": { + source: "test-source", + opt: Options{ + Mode: RefMode, + }, + wantMode: RefMode, + }, + "with mode trash": { + source: "test-source", + opt: Options{ + Mode: "trash", + }, + wantMode: RefMode, + }, + "with mode gvrs": { + source: "test-source", + opt: Options{ + GVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }, + Controller: true, + }}, + }, + wantMode: RefMode, + wantGVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }, + Controller: true, + }}, }, } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + + a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt) - for _, tc := range table { - t.Run(tc.Name, func(t *gotesting.T) { - // Create fake sink server - h := &fakeHandler{ - handler: tc.sink, + got, ok := a.(*adapter) + if !ok { + t.Errorf("expected NewAdapter to return a *adapter, but did not") + } + if diff := cmp.Diff(tc.source, got.source); diff != "" { + t.Errorf("unexpected source diff (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.wantMode, got.mode); diff != "" { + t.Errorf("unexpected mode diff (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.wantNamespace, got.namespace); diff != "" { + t.Errorf("unexpected namespace diff (-want, +got) = %v", diff) + } + if diff := cmp.Diff(tc.wantGVRCs, got.gvrcs); diff != "" { + t.Errorf("unexpected namespace diff (-want, +got) = %v", diff) } + }) + } +} - sinkServer := httptest.NewServer(h) - defer sinkServer.Close() +func TestAdapter_StartRef(t *testing.T) { + ce := kncetesting.NewTestClient() + logger := zap.NewExample().Sugar() + k8s := makeDynamicClient(nil) + source := "test-source" + opt := Options{ + Mode: RefMode, + Namespace: "default", + GVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + }}, + } - // Bind cloud event client - ceClient, err := kncloudevents.NewDefaultClient(sinkServer.URL) - if err != nil { - t.Errorf("cannot create cloud event client: %v", zap.Error(err)) - } + a := NewAdaptor(source, k8s, ce, logger, opt) - // Create fake dynamic client - dynamicScheme := runtime.NewScheme() - client := fakedynamicclientset.NewSimpleDynamicClient(dynamicScheme, tc.InitialState...) + err := errors.New("test never ran") + stopCh := make(chan struct{}) + done := make(chan struct{}) + go func() { + err = a.Start(stopCh) + done <- struct{}{} + }() - stopCh := make(chan struct{}) - defer close(stopCh) + stopCh <- struct{}{} + <-done - tif := &duck.TypedInformerFactory{ - Client: client, - Type: &duckv1alpha1.AddressableType{}, - ResyncPeriod: 1 * time.Second, - StopChannel: stopCh, - } + if err != nil { + t.Errorf("did not expect an error, but got %v", err) + } +} - _, lister, err := tif.Get(schema.GroupVersionResource{Group: "", Resource: "pods", Version: "v1"}) - if err != nil { - t.Fatalf("Get() = %v", err) - } +func TestAdapter_StartResource(t *testing.T) { + ce := kncetesting.NewTestClient() + logger := zap.NewExample().Sugar() + k8s := makeDynamicClient(nil) + source := "test-source" + opt := Options{ + Mode: ResourceMode, + Namespace: "default", + GVRCs: []GVRC{{ + GVR: schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + }}, + } - opt := reconciler.Options{ - KubeClientSet: fakekubeclientset.NewSimpleClientset(), - Logger: logtesting.TestLogger(t), - } + a := NewAdaptor(source, k8s, ce, logger, opt) - r := &Reconciler{ - Base: reconciler.NewBase(opt, controllerAgentName), - eventsClient: ceClient, - lister: lister, - } - ctx := context.Background() + err := errors.New("test never ran") + stopCh := make(chan struct{}) + done := make(chan struct{}) + go func() { + err = a.Start(stopCh) + done <- struct{}{} + }() - err = r.Reconcile(ctx, tc.Key) - if err != nil { - t.Errorf("Expected no error") - } + stopCh <- struct{}{} + <-done - if diff := cmp.Diff(tc.data, decode(t, h.body)); diff != "" { - t.Errorf("incorrect event (-want, +got): %v", diff) - } - }) + if err != nil { + t.Errorf("did not expect an error, but got %v", err) } +} + +// Common methods: +// GetDynamicClient returns the mockDynamicClient to use for this test case. +func makeDynamicClient(objects []runtime.Object) dynamic.Interface { + sc := runtime.NewScheme() + _ = corev1.AddToScheme(sc) + dynamicMocks := rectesting.DynamicMocks{} // TODO: maybe we need to customize this. + realInterface := dynamicfake.NewSimpleDynamicClient(sc, objects...) + return rectesting.NewMockDynamicInterface(realInterface, dynamicMocks) } -func getPod() runtime.Object { +func simplePod(name, namespace string) *unstructured.Unstructured { return &unstructured.Unstructured{ Object: map[string]interface{}{ "apiVersion": "v1", "kind": "Pod", "metadata": map[string]interface{}{ - "namespace": testNS, - "name": sourceName, - "selfLink": "/apis/v1/namespaces/" + testNS + "/pod/" + sourceName, + "namespace": namespace, + "name": name, }, }, } } -func getPodRef() corev1.ObjectReference { - return corev1.ObjectReference{ - APIVersion: "v1", - Kind: "Pod", - Name: sourceName, - Namespace: testNS, +func simpleOwnedPod(name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": "owned", + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "apps/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "ReplicaSet", + "name": name, + "uid": "0c119059-7113-11e9-a6c5-42010a8a00ed", + }, + }, + }, + }, } } -type fakeHandler struct { - body []byte - header http.Header +func validateSent(t *testing.T, ce *kncetesting.TestCloudEventsClient, want string) { + if got := len(ce.Sent); got != 1 { + t.Errorf("Expected 1 event to be sent, got %d", got) + } - handler func(http.ResponseWriter, *http.Request) + if got := ce.Sent[0].Type(); got != want { + t.Errorf("Expected %q event to be sent, got %q", want, got) + } } -func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.header = r.Header - body, err := ioutil.ReadAll(r.Body) - if err != nil { - http.Error(w, "can not read body", http.StatusBadRequest) - return +func validateNotSent(t *testing.T, ce *kncetesting.TestCloudEventsClient, want string) { + if got := len(ce.Sent); got != 0 { + t.Errorf("Expected 0 event to be sent, got %d", got) } - h.body = body - defer r.Body.Close() - h.handler(w, r) } -func sinkAccepted(writer http.ResponseWriter, req *http.Request) { - writer.WriteHeader(http.StatusOK) -} +func makeResourceAndTestingClient() (*resource, *kncetesting.TestCloudEventsClient) { + ce := kncetesting.NewTestClient() + source := "unit-test" + logger := zap.NewExample().Sugar() -func encode(t *gotesting.T, data interface{}) string { - b, err := json.Encode(data) - if err != nil { - t.Fatalf("failed to encode data: %v", err) - } - return string(b) + return &resource{ + ce: ce, + source: source, + logger: logger, + }, ce } -func decode(t *gotesting.T, data interface{}) interface{} { - var out interface{} - err := json.Decode(data, &out) - if err != nil { - t.Fatalf("failed to decode data: %v", err) - } - return out +func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) { + ce := kncetesting.NewTestClient() + source := "unit-test" + logger := zap.NewExample().Sugar() + + return &ref{ + ce: ce, + source: source, + logger: logger, + }, ce } diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go new file mode 100644 index 00000000000..5d6ec76eb6c --- /dev/null +++ b/pkg/adapter/apiserver/events/events.go @@ -0,0 +1,147 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events + +import ( + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + cloudevents "github.com/cloudevents/sdk-go" +) + +const ( + AddEventType = "dev.knative.apiserver.resource.add" + UpdateEventType = "dev.knative.apiserver.resource.update" + DeleteEventType = "dev.knative.apiserver.resource.delete" + + AddEventRefType = "dev.knative.apiserver.ref.add" + UpdateEventRefType = "dev.knative.apiserver.ref.update" + DeleteEventRefType = "dev.knative.apiserver.ref.delete" +) + +func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } + object := obj.(*unstructured.Unstructured) + + return makeEvent(source, AddEventType, object, object) +} + +func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } + object := obj.(*unstructured.Unstructured) + + return makeEvent(source, UpdateEventType, object, object) +} + +func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } + object := obj.(*unstructured.Unstructured) + + return makeEvent(source, DeleteEventType, object, object) +} + +func getRef(object *unstructured.Unstructured, asController bool) corev1.ObjectReference { + if asController { + if owner := metav1.GetControllerOf(object); owner != nil { + return corev1.ObjectReference{ + APIVersion: owner.APIVersion, + Kind: owner.Kind, + Name: owner.Name, + Namespace: object.GetNamespace(), + } + } + } + return corev1.ObjectReference{ + APIVersion: object.GetAPIVersion(), + Kind: object.GetKind(), + Name: object.GetName(), + Namespace: object.GetNamespace(), + } +} + +func MakeAddRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } + object := obj.(*unstructured.Unstructured) + return makeEvent(source, AddEventRefType, object, getRef(object, asController)) +} + +func MakeUpdateRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("new resource can not be nil") + } + object := obj.(*unstructured.Unstructured) + return makeEvent(source, UpdateEventRefType, object, getRef(object, asController)) +} + +func MakeDeleteRefEvent(source string, asController bool, obj interface{}) (*cloudevents.Event, error) { + if obj == nil { + return nil, fmt.Errorf("resource can not be nil") + } + object := obj.(*unstructured.Unstructured) + return makeEvent(source, DeleteEventRefType, object, getRef(object, asController)) +} + +func makeEvent(source, eventType string, obj *unstructured.Unstructured, data interface{}) (*cloudevents.Event, error) { + subject := createSelfLink(corev1.ObjectReference{ + APIVersion: obj.GetAPIVersion(), + Kind: obj.GetKind(), + Name: obj.GetName(), + Namespace: obj.GetNamespace(), + }) + + event := cloudevents.NewEvent() + event.SetType(eventType) + event.SetSource(source) + event.SetSubject(subject) + + if err := event.SetData(data); err != nil { + return nil, err + } + return &event, nil +} + +// Creates a URI of the form found in object metadata selfLinks +// Format looks like: /apis/feeds.knative.dev/v1alpha1/namespaces/default/feeds/k8s-events-example +// KNOWN ISSUES: +// * ObjectReference.APIVersion has no version information (e.g. serving.knative.dev rather than serving.knative.dev/v1alpha1) +// * ObjectReference does not have enough information to create the pluaralized list type (e.g. "revisions" from kind: Revision) +// +// Track these issues at https://github.com/kubernetes/kubernetes/issues/66313 +// We could possibly work around this by adding a lister for the resources referenced by these events. +func createSelfLink(o corev1.ObjectReference) string { + collectionNameHack := strings.ToLower(o.Kind) + "s" + versionNameHack := o.APIVersion + + // Core API types don't have a separate package name and only have a version string (e.g. /apis/v1/namespaces/default/pods/myPod) + // To avoid weird looking strings like "v1/versionUnknown" we'll sniff for a "." in the version + if strings.Contains(versionNameHack, ".") && !strings.Contains(versionNameHack, "/") { + versionNameHack = versionNameHack + "/versionUnknown" + } + return fmt.Sprintf("/apis/%s/namespaces/%s/%s/%s", versionNameHack, o.Namespace, collectionNameHack, o.Name) +} diff --git a/pkg/adapter/apiserver/events/events_test.go b/pkg/adapter/apiserver/events/events_test.go new file mode 100644 index 00000000000..1740216ad7a --- /dev/null +++ b/pkg/adapter/apiserver/events/events_test.go @@ -0,0 +1,357 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package events_test + +import ( + "github.com/google/go-cmp/cmp/cmpopts" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "strings" + "testing" + + cloudevents "github.com/cloudevents/sdk-go" + "github.com/google/go-cmp/cmp" + "github.com/knative/eventing/pkg/adapter/apiserver/events" +) + +func simplePod(name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": name, + }, + }, + } +} + +func simpleOwnedPod(name, namespace string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": map[string]interface{}{ + "namespace": namespace, + "name": "owned", + "ownerReferences": []interface{}{ + map[string]interface{}{ + "apiVersion": "apps/v1", + "blockOwnerDeletion": true, + "controller": true, + "kind": "ReplicaSet", + "name": name, + "uid": "0c119059-7113-11e9-a6c5-42010a8a00ed", + }, + }, + }, + }, + } +} + +func TestMakeAddEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.resource.add", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeAddEvent(tc.source, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeUpdateEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "new resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.resource.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeUpdateEvent(tc.source, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeDeleteEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.resource.delete", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"apiVersion":"v1","kind":"Pod","metadata":{"name":"unit","namespace":"test"}}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeDeleteEvent(tc.source, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeAddRefEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + asController bool + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.add", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, + }, + "simple owned pod": { + source: "unit-test", + obj: simpleOwnedPod("unit", "test"), + asController: true, + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.add", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/owned", + }, + }.AsV02(), + }, + wantData: `{"kind":"ReplicaSet","namespace":"test","name":"unit","apiVersion":"apps/v1"}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeAddRefEvent(tc.source, tc.asController, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeUpdateRefEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + asController bool + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "new resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, + }, + "simple owned pod": { + source: "unit-test", + obj: simpleOwnedPod("unit", "test"), + asController: true, + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.update", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/owned", + }, + }.AsV02(), + }, + wantData: `{"kind":"ReplicaSet","namespace":"test","name":"unit","apiVersion":"apps/v1"}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeUpdateRefEvent(tc.source, tc.asController, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func TestMakeDeleteRefEvent(t *testing.T) { + testCases := map[string]struct { + obj interface{} + source string + asController bool + + want *cloudevents.Event + wantData string + wantErr string + }{ + "nil object": { + source: "unit-test", + want: nil, + wantErr: "resource can not be nil", + }, + "simple pod": { + source: "unit-test", + obj: simplePod("unit", "test"), + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.delete", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/unit", + }, + }.AsV02(), + }, + wantData: `{"kind":"Pod","namespace":"test","name":"unit","apiVersion":"v1"}`, + }, + "simple owned pod": { + source: "unit-test", + obj: simpleOwnedPod("unit", "test"), + asController: true, + want: &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "dev.knative.apiserver.ref.delete", + Source: *cloudevents.ParseURLRef("unit-test"), + Extensions: map[string]interface{}{ + "subject": "/apis/v1/namespaces/test/pods/owned", + }, + }.AsV02(), + }, + wantData: `{"kind":"ReplicaSet","namespace":"test","name":"unit","apiVersion":"apps/v1"}`, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + got, err := events.MakeDeleteRefEvent(tc.source, tc.asController, tc.obj) + validate(t, got, err, tc.want, tc.wantData, tc.wantErr) + }) + } +} + +func validate(t *testing.T, got *cloudevents.Event, err error, want *cloudevents.Event, wantData, wantErr string) { + if wantErr != "" || err != nil { + var gotErr string + if err != nil { + gotErr = err.Error() + } + if !strings.Contains(wantErr, gotErr) { + diff := cmp.Diff(wantErr, gotErr) + t.Errorf("unexpected error (-want, +got) = %v", diff) + } + return + } + + if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(cloudevents.Event{}, "Data", "DataEncoded")); diff != "" { + t.Errorf("unexpected event diff (-want, +got) = %v", diff) + } + + gotData := string(got.Data.([]byte)) + if diff := cmp.Diff(wantData, gotData); diff != "" { + t.Errorf("unexpected data diff (-want, +got) = %v", diff) + } +} diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go new file mode 100644 index 00000000000..f0f25dfd1bb --- /dev/null +++ b/pkg/adapter/apiserver/ref.go @@ -0,0 +1,144 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + "reflect" + + cloudevents "github.com/cloudevents/sdk-go" + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "go.uber.org/zap" +) + +type ref struct { + ce cloudevents.Client + source string + logger *zap.SugaredLogger + + controlledGVRs []schema.GroupVersionResource +} + +var _ cache.Store = (*ref)(nil) + +// TODO: I think asController is not the feature we want. I think we want to be +// able to set the controller as a filter to the watch. Not emit all owners of +// the resource. Fix this. It has to be an api change on the CRD. + +func (a *ref) asController(obj interface{}) bool { + if len(a.controlledGVRs) > 0 { + object := obj.(*unstructured.Unstructured) + gvk := object.GroupVersionKind() + // TODO: pass down the resource and the kind so we do not have to guess. + gvr, _ := meta.UnsafeGuessKindToResource(gvk) + for _, gvrc := range a.controlledGVRs { + if reflect.DeepEqual(gvr, gvrc) { + return true + } + } + } + return false +} + +// Implements cache.Store +func (a *ref) Add(obj interface{}) error { + event, err := events.MakeAddRefEvent(a.source, a.asController(obj), obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return err + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + return err + } + return nil +} + +// Implements cache.Store +func (a *ref) Update(obj interface{}) error { + event, err := events.MakeUpdateRefEvent(a.source, a.asController(obj), obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return err + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + return err + } + return nil +} + +// Implements cache.Store +func (a *ref) Delete(obj interface{}) error { + event, err := events.MakeDeleteRefEvent(a.source, a.asController(obj), obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return err + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + return err + } + return nil +} + +func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { + if a.controlledGVRs == nil { + a.controlledGVRs = []schema.GroupVersionResource{gvr} + return + } + a.controlledGVRs = append(a.controlledGVRs, gvr) +} + +// Stub cache.Store impl + +// Implements cache.Store +func (a *ref) List() []interface{} { + return nil +} + +// Implements cache.Store +func (a *ref) ListKeys() []string { + return nil +} + +// Implements cache.Store +func (a *ref) Get(obj interface{}) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *ref) GetByKey(key string) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *ref) Replace([]interface{}, string) error { + return nil +} + +// Implements cache.Store +func (a *ref) Resync() error { + return nil +} diff --git a/pkg/adapter/apiserver/ref_test.go b/pkg/adapter/apiserver/ref_test.go new file mode 100644 index 00000000000..d70e1fa5ecb --- /dev/null +++ b/pkg/adapter/apiserver/ref_test.go @@ -0,0 +1,88 @@ +package apiserver + +import ( + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "k8s.io/apimachinery/pkg/runtime/schema" + "testing" +) + +func TestRefAddEvent(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.Add(simplePod("unit", "test")) + validateSent(t, ce, events.AddEventRefType) +} + +func TestRefUpdateEvent(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.Update(simplePod("unit", "test")) + validateSent(t, ce, events.UpdateEventRefType) +} + +func TestRefDeleteEvent(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.Delete(simplePod("unit", "test")) + validateSent(t, ce, events.DeleteEventRefType) +} + +func TestRefAddEventNil(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.Add(nil) + validateNotSent(t, ce, events.AddEventRefType) +} + +func TestRefUpdateEventNil(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.Update(nil) + validateNotSent(t, ce, events.UpdateEventRefType) +} + +func TestRefDeleteEventNil(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.Delete(nil) + validateNotSent(t, ce, events.DeleteEventRefType) +} + +func TestRefAddEventAsController(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }) + d.Add(simpleOwnedPod("unit", "test")) + validateSent(t, ce, events.AddEventRefType) +} + +func TestRefUpdateEventAsController(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }) + d.Update(simpleOwnedPod("unit", "test")) + validateSent(t, ce, events.UpdateEventRefType) +} + +func TestRefDeleteEventAsController(t *testing.T) { + d, ce := makeRefAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }) + d.Delete(simpleOwnedPod("unit", "test")) + validateSent(t, ce, events.DeleteEventRefType) +} + +// HACKHACKHACK For test coverage. +func TestRefStub(t *testing.T) { + d, _ := makeRefAndTestingClient() + + d.List() + d.ListKeys() + d.Get(nil) + d.GetByKey("") + d.Replace(nil, "") + d.Resync() +} diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go new file mode 100644 index 00000000000..6eef0bbf978 --- /dev/null +++ b/pkg/adapter/apiserver/resource.go @@ -0,0 +1,116 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "context" + cloudevents "github.com/cloudevents/sdk-go" + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +type resource struct { + ce cloudevents.Client + source string + logger *zap.SugaredLogger +} + +var _ cache.Store = (*resource)(nil) + +func (a *resource) Add(obj interface{}) error { + event, err := events.MakeAddEvent(a.source, obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return err + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + return err + } + + return nil +} + +func (a *resource) Update(obj interface{}) error { + event, err := events.MakeUpdateEvent(a.source, obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return err + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + return err + } + + return nil +} + +func (a *resource) Delete(obj interface{}) error { + event, err := events.MakeDeleteEvent(a.source, obj) + if err != nil { + a.logger.Info("event creation failed", zap.Error(err)) + return err + } + + if _, err := a.ce.Send(context.Background(), *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + return err + } + + return nil +} + +func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { + // not supported for resource. + a.logger.Warn("ignored controller watch request on gvr.", zap.String("gvr", gvr.String())) +} + +// Stub cache.Store impl + +// Implements cache.Store +func (a *resource) List() []interface{} { + return nil +} + +// Implements cache.Store +func (a *resource) ListKeys() []string { + return nil +} + +// Implements cache.Store +func (a *resource) Get(obj interface{}) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *resource) GetByKey(key string) (item interface{}, exists bool, err error) { + return nil, false, nil +} + +// Implements cache.Store +func (a *resource) Replace([]interface{}, string) error { + return nil +} + +// Implements cache.Store +func (a *resource) Resync() error { + return nil +} diff --git a/pkg/adapter/apiserver/resource_test.go b/pkg/adapter/apiserver/resource_test.go new file mode 100644 index 00000000000..bcab080bb38 --- /dev/null +++ b/pkg/adapter/apiserver/resource_test.go @@ -0,0 +1,60 @@ +package apiserver + +import ( + "github.com/knative/eventing/pkg/adapter/apiserver/events" + "k8s.io/apimachinery/pkg/runtime/schema" + "testing" +) + +func TestResourceAddEvent(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.Add(simplePod("unit", "test")) + validateSent(t, ce, events.AddEventType) +} + +func TestResourceUpdateEvent(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.Update(simplePod("unit", "test")) + validateSent(t, ce, events.UpdateEventType) +} + +func TestResourceDeleteEvent(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.Delete(simplePod("unit", "test")) + validateSent(t, ce, events.DeleteEventType) +} + +func TestResourceAddEventNil(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.Add(nil) + validateNotSent(t, ce, events.AddEventType) +} + +func TestResourceUpdateEventNil(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.Update(nil) + validateNotSent(t, ce, events.UpdateEventType) +} + +func TestResourceDeleteEventNil(t *testing.T) { + d, ce := makeResourceAndTestingClient() + d.Delete(nil) + validateNotSent(t, ce, events.DeleteEventType) +} + +func TestResourceCoverageHacks(t *testing.T) { + d, _ := makeResourceAndTestingClient() + d.addControllerWatch(schema.GroupVersionResource{}) // for coverage. +} + +// HACKHACKHACK For test coverage. +func TestResourceStub(t *testing.T) { + d, _ := makeResourceAndTestingClient() + + d.List() + d.ListKeys() + d.Get(nil) + d.GetByKey("") + d.Replace(nil, "") + d.Resync() +} diff --git a/pkg/apis/sources/v1alpha1/apiserver_types.go b/pkg/apis/sources/v1alpha1/apiserver_types.go index 344a0cea997..edd632fc175 100644 --- a/pkg/apis/sources/v1alpha1/apiserver_types.go +++ b/pkg/apis/sources/v1alpha1/apiserver_types.go @@ -67,6 +67,11 @@ type ApiServerSourceSpec struct { // Sink is a reference to an object that will resolve to a domain name to use as the sink. // +optional Sink *corev1.ObjectReference `json:"sink,omitempty"` + + // Mode is the mode the receive adapter controller runs under: Ref or Resource. + // `Ref` sends only the reference to the resource. + // `Resource` send the full resource. + Mode string `json:"mode,omitempty"` } // ApiServerSourceStatus defines the observed state of ApiServerSource diff --git a/pkg/kncloudevents/testing/test_client.go b/pkg/kncloudevents/testing/test_client.go new file mode 100644 index 00000000000..97c3422ea14 --- /dev/null +++ b/pkg/kncloudevents/testing/test_client.go @@ -0,0 +1,37 @@ +package testing + +import ( + "context" + cloudevents "github.com/cloudevents/sdk-go" +) + +type TestCloudEventsClient struct { + Sent []cloudevents.Event +} + +var _ cloudevents.Client = (*TestCloudEventsClient)(nil) + +func (c *TestCloudEventsClient) Send(ctx context.Context, event cloudevents.Event) (*cloudevents.Event, error) { + // TODO: improve later. + c.Sent = append(c.Sent, event) + return nil, nil +} + +func (c *TestCloudEventsClient) StartReceiver(ctx context.Context, fn interface{}) error { + // TODO: improve later. + <-ctx.Done() + return nil +} + +func (c *TestCloudEventsClient) Reset() { + c.Sent = make([]cloudevents.Event, 0) +} + +func NewTestClient() *TestCloudEventsClient { + + c := &TestCloudEventsClient{ + Sent: make([]cloudevents.Event, 0), + } + + return c +} diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index 378e800f01b..7fd4223c9e0 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -56,7 +56,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - "sidecar.istio.io/inject": "true", + "sidecar.istio.io/inject": "false", // needs to talk to the api server. }, Labels: args.Labels, }, @@ -92,30 +92,27 @@ func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar sep = "," } - return []corev1.EnvVar{ - { - Name: "SINK_URI", - Value: sinkURI, - }, - { - Name: "API_VERSION", - Value: apiversions, - }, - { - Name: "KIND", - Value: kinds, - }, - { - Name: "CONTROLLER", - Value: controlled, - }, - { - Name: "SYSTEM_NAMESPACE", - ValueFrom: &corev1.EnvVarSource{ - FieldRef: &corev1.ObjectFieldSelector{ - FieldPath: "metadata.namespace", - }, + return []corev1.EnvVar{{ + Name: "SINK_URI", + Value: sinkURI, + }, { + Name: "MODE", + Value: spec.Mode, + }, { + Name: "API_VERSION", + Value: apiversions, + }, { + Name: "KIND", + Value: kinds, + }, { + Name: "CONTROLLER", + Value: controlled, + }, { + Name: "SYSTEM_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: "metadata.namespace", }, }, - } + }} } diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index f6574ec8ee6..0b72cb128ea 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -89,7 +89,7 @@ func TestMakeReceiveAdapter(t *testing.T) { Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - "sidecar.istio.io/inject": "true", + "sidecar.istio.io/inject": "false", }, Labels: map[string]string{ "test-key1": "test-value1", @@ -106,6 +106,8 @@ func TestMakeReceiveAdapter(t *testing.T) { { Name: "SINK_URI", Value: "sink-uri", + }, { + Name: "MODE", }, { Name: "API_VERSION", Value: ",", diff --git a/sample/apiserver_pod.yaml b/sample/apiserver_pod.yaml new file mode 100644 index 00000000000..a97b1b918c1 --- /dev/null +++ b/sample/apiserver_pod.yaml @@ -0,0 +1,47 @@ +apiVersion: sources.eventing.knative.dev/v1alpha1 +kind: ApiServerSource +metadata: + name: api-server-source +spec: + serviceAccountName: events-sa + resources: + - apiVersion: v1 + kind: Pod + mode: "Resource" + sink: + apiVersion: eventing.knative.dev/v1alpha1 + kind: Broker + name: default +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: events-sa + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: event-watcher +rules: + - apiGroups: + - "" + resources: + - pods + verbs: + - get + - list + - watch +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: k8s-ra-event-watcher +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: event-watcher +subjects: + - kind: ServiceAccount + name: events-sa + namespace: default