diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index e93a64abe96..a2a13eba26e 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" "knative.dev/eventing/pkg/adapter/v2" @@ -44,9 +45,10 @@ type apiServerAdapter struct { config Config - k8s dynamic.Interface - source string // TODO: who dis? - name string // TODO: who dis? + discover discovery.DiscoveryInterface + k8s dynamic.Interface + source string // TODO: who dis? + name string // TODO: who dis? } func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error { @@ -75,15 +77,40 @@ func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error { a.logger.Infof("STARTING -- %#v", a.config) - for _, r := range a.config.Resources { - lw := &cache.ListWatch{ - // TODO: this will not work with cluster scoped resources. - ListFunc: asUnstructuredLister(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).List, r.LabelSelector), - WatchFunc: asUnstructuredWatcher(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).Watch, r.LabelSelector), + for _, configRes := range a.config.Resources { + + resources, err := a.discover.ServerResourcesForGroupVersion(configRes.GVR.GroupVersion().String()) + if err != nil { + a.logger.Errorf("Could not retrieve information about resource %s: %s", configRes.GVR.String(), err.Error()) + continue } - reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod) - go reflector.Run(stop) + exists := false + for _, apires := range resources.APIResources { + if apires.Name == configRes.GVR.Resource { + + var res dynamic.ResourceInterface + if apires.Namespaced { + res = a.k8s.Resource(configRes.GVR).Namespace(a.config.Namespace) + } else { + res = a.k8s.Resource(configRes.GVR) + } + + lw := &cache.ListWatch{ + ListFunc: asUnstructuredLister(res.List, configRes.LabelSelector), + WatchFunc: asUnstructuredWatcher(res.Watch, configRes.LabelSelector), + } + + reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod) + go reflector.Run(stop) + exists = true + break + } + } + + if !exists { + a.logger.Errorf("Could not retrieve information about resource %s: %s", configRes.GVR.String()) + } } <-stopCh diff --git a/pkg/adapter/apiserver/adapter_injection.go b/pkg/adapter/apiserver/adapter_injection.go index 70b92d78b4e..045bae46b1d 100644 --- a/pkg/adapter/apiserver/adapter_injection.go +++ b/pkg/adapter/apiserver/adapter_injection.go @@ -23,6 +23,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "k8s.io/client-go/rest" "knative.dev/eventing/pkg/adapter/v2" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/injection" "knative.dev/pkg/injection/clients/dynamicclient" "knative.dev/pkg/logging" @@ -66,11 +67,12 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie } return &apiServerAdapter{ - k8s: dynamicclient.Get(ctx), - ce: ceClient, - source: Get(ctx), - name: env.Name, - config: config, + discover: kubeclient.Get(ctx).Discovery(), + k8s: dynamicclient.Get(ctx), + ce: ceClient, + source: Get(ctx), + name: env.Name, + config: config, logger: logger, } diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 2257a7f53de..fc360b41593 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -23,11 +23,15 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + discoveryfake "k8s.io/client-go/discovery/fake" "k8s.io/client-go/dynamic" dynamicfake "k8s.io/client-go/dynamic/fake" + kubetesting "k8s.io/client-go/testing" adaptertest "knative.dev/eventing/pkg/adapter/v2/test" rectesting "knative.dev/eventing/pkg/reconciler/testing" "knative.dev/pkg/logging" @@ -53,9 +57,11 @@ func TestAdapter_StartRef(t *testing.T) { ce: ce, logger: logging.FromContext(ctx), config: config, - k8s: makeDynamicClient(simplePod("foo", "default")), - source: "unit-test", - name: "unittest", + + discover: makeDiscoveryClient(), + k8s: makeDynamicClient(simplePod("foo", "default")), + source: "unit-test", + name: "unittest", } err := errors.New("test never ran") @@ -98,9 +104,58 @@ func TestAdapter_StartResource(t *testing.T) { ce: ce, logger: logging.FromContext(ctx), config: config, - k8s: makeDynamicClient(simplePod("foo", "default")), - source: "unit-test", - name: "unittest", + + discover: makeDiscoveryClient(), + k8s: makeDynamicClient(simplePod("foo", "default")), + source: "unit-test", + name: "unittest", + } + + err := errors.New("test never ran") + stopCh := make(chan struct{}) + done := make(chan struct{}) + go func() { + err = a.Start(stopCh) + done <- struct{}{} + }() + + // Wait for the reflector to be fully initialized. + // Ideally we want to check LastSyncResourceVersion is not empty but we + // don't have access to it. + time.Sleep(1 * time.Second) + + stopCh <- struct{}{} + <-done + + if err != nil { + t.Errorf("did not expect an error, but got %v", err) + } +} + +func TestAdapter_StartNonNamespacedResource(t *testing.T) { + ce := adaptertest.NewTestClient() + + config := Config{ + Namespace: "default", + Resources: []ResourceWatch{{ + GVR: schema.GroupVersionResource{ + Version: "v1", + Resource: "namespaces", + }, + }}, + EventMode: "Resource", + } + ctx, _ := pkgtesting.SetupFakeContext(t) + + a := &apiServerAdapter{ + ce: ce, + logger: logging.FromContext(ctx), + config: config, + + discover: makeDiscoveryClient(), + k8s: makeDynamicClient(simpleNamespace("foo")), + source: "unit-test", + name: "unittest", } err := errors.New("test never ran") @@ -135,6 +190,35 @@ func makeDynamicClient(objects ...runtime.Object) dynamic.Interface { return rectesting.NewMockDynamicInterface(realInterface, dynamicMocks) } +func makeDiscoveryClient() discovery.DiscoveryInterface { + return &discoveryfake.FakeDiscovery{ + Fake: &kubetesting.Fake{ + Resources: []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + // All resources used at tests need to be listed here + { + Name: "pods", + Namespaced: true, + Group: "", + Version: "v1", + Kind: "Pod", + }, + { + Name: "namespaces", + Namespaced: false, + Group: "", + Version: "v1", + Kind: "Namespace", + }, + }, + }, + }, + }, + } +} + func simplePod(name, namespace string) *unstructured.Unstructured { return &unstructured.Unstructured{ Object: map[string]interface{}{ @@ -148,6 +232,18 @@ func simplePod(name, namespace string) *unstructured.Unstructured { } } +func simpleNamespace(name string) *unstructured.Unstructured { + return &unstructured.Unstructured{ + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": map[string]interface{}{ + "name": name, + }, + }, + } +} + func simpleOwnedPod(name, namespace string) *unstructured.Unstructured { return &unstructured.Unstructured{ Object: map[string]interface{}{