From addcdaf79c4923377199d65177544668f27f6e8a Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 4 Mar 2023 00:28:35 +0100 Subject: [PATCH 1/5] small refactoring and improve coverage K8s Signed-off-by: Giovanni Liva --- go.mod | 1 + go.sum | 1 + pkg/runtime/from_config.go | 20 +- pkg/sync/kubernetes/kubernetes_sync.go | 64 ++--- pkg/sync/kubernetes/kubernetes_sync_test.go | 249 +++++++++++++++++++- 5 files changed, 299 insertions(+), 36 deletions(-) diff --git a/go.mod b/go.mod index 01fd399e0..349e79995 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( github.com/cucumber/messages-go/v16 v16.0.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.10.1 // indirect + github.com/evanphx/json-patch v4.12.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 413bd96db..7030855a1 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,7 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go. github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84= +github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch/v5 v5.6.0 h1:b91NhWfaz02IuVxO9faSllyAtNXHMPkC5J8sJCLunww= github.com/evanphx/json-patch/v5 v5.6.0/go.mod h1:G79N1coSVB93tBe7j6PhzjmR3/2VvlbKOFpnXhI9Bw4= github.com/flowstack/go-jsonschema v0.1.1/go.mod h1:yL7fNggx1o8rm9RlgXv7hTBWxdBM0rVwpMwimd3F3N0= diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 0920601f0..e61a8392b 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -77,9 +77,13 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { ) rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri)) case regCrd.Match(uriB): + k, err := r.newK8s(uri, logger) + if err != nil { + return err + } r.SyncImpl = append( r.SyncImpl, - r.newK8s(uri, logger), + k, ) rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri)) case regURL.Match(uriB): @@ -127,15 +131,21 @@ func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync { } } -func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync { +func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { + reader, dynamic, err := kubernetes.GetClients() + if err != nil { + return nil, err + } return &kubernetes.Sync{ Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "kubernetes"), ), - URI: regCrd.ReplaceAllString(uri, ""), - ProviderArgs: r.config.ProviderArgs, - } + URI: regCrd.ReplaceAllString(uri, ""), + ProviderArgs: r.config.ProviderArgs, + ReadClient: reader, + DynamicClient: dynamic, + }, nil } func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync { diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index 740ae18b8..64d684054 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -23,20 +23,22 @@ import ( ) var ( - resyncPeriod = 1 * time.Minute - apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) + resyncPeriod = 1 * time.Minute + apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) + featurFlagConfigurationResource = v1alpha1.GroupVersion.WithResource("featureflagconfigurations") ) type Sync struct { - Logger *logger.Logger - ProviderArgs sync.ProviderArgs - URI string - - ready bool - namespace string - crdName string - readClient client.Reader - informer cache.SharedInformer + Logger *logger.Logger + ProviderArgs sync.ProviderArgs + URI string + ReadClient client.Reader + DynamicClient dynamic.Interface + + ready bool + namespace string + crdName string + informer cache.SharedInformer } func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { @@ -59,28 +61,12 @@ func (k *Sync) Init(ctx context.Context) error { if err := v1alpha1.AddToScheme(scheme.Scheme); err != nil { return err } - clusterConfig, err := k8sClusterConfig() - if err != nil { - return err - } - - k.readClient, err = client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) - if err != nil { - return err - } - - dynamicClient, err := dynamic.NewForConfig(clusterConfig) - if err != nil { - return err - } - - resource := v1alpha1.GroupVersion.WithResource("featureflagconfigurations") // The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero. // For more details on resync implications refer to tools/cache/shared_informer.go - factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, resyncPeriod, k.namespace, nil) + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.DynamicClient, resyncPeriod, k.namespace, nil) - k.informer = factory.ForResource(resource).Informer() + k.informer = factory.ForResource(featurFlagConfigurationResource).Informer() return nil } @@ -178,7 +164,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) { // fallback to API access - this is an informer cache miss. Could happen at the startup where cache is not filled var ff v1alpha1.FeatureFlagConfiguration - err = k.readClient.Get(ctx, client.ObjectKey{ + err = k.ReadClient.Get(ctx, client.ObjectKey{ Name: k.crdName, Namespace: k.namespace, }, &ff) @@ -325,3 +311,21 @@ func k8sClusterConfig() (*rest.Config, error) { return clusterConfig, nil } + +func GetClients() (client.Reader, dynamic.Interface, error) { + clusterConfig, err := k8sClusterConfig() + if err != nil { + return nil, nil, err + } + + readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return nil, nil, err + } + + dynamicClient, err := dynamic.NewForConfig(clusterConfig) + if err != nil { + return nil, nil, err + } + return readClient, dynamicClient, nil +} diff --git a/pkg/sync/kubernetes/kubernetes_sync_test.go b/pkg/sync/kubernetes/kubernetes_sync_test.go index 0f92013ac..0d820cdeb 100644 --- a/pkg/sync/kubernetes/kubernetes_sync_test.go +++ b/pkg/sync/kubernetes/kubernetes_sync_test.go @@ -4,7 +4,11 @@ import ( "context" "encoding/json" "errors" + "fmt" + "go.uber.org/zap/zapcore" + "k8s.io/client-go/kubernetes/scheme" "reflect" + "strings" "testing" "time" @@ -18,8 +22,11 @@ import ( "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic/fake" "sigs.k8s.io/controller-runtime/pkg/client" + fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake" ) var Metadata = v1.TypeMeta{ @@ -442,7 +449,7 @@ func TestSync_fetch(t *testing.T) { GetByKeyFunc: tt.args.InformerGetFunc, }, }, - readClient: &MockClient{ + ReadClient: &MockClient{ getResponse: tt.args.ClientResponse, clientErr: tt.args.ClientError, }, @@ -556,6 +563,246 @@ func TestSync_watcher(t *testing.T) { } } +func TestInit(t *testing.T) { + t.Run("expect error with wrong URI format", func(t *testing.T) { + k := Sync{URI: ""} + e := k.Init(context.TODO()) + if e == nil { + t.Errorf("Expected error but got none") + } + if k.IsReady() { + t.Errorf("Expected NOT to be ready") + } + }) + t.Run("expect informer registration", func(t *testing.T) { + const name = "myFF" + const ns = "myNS" + scheme := runtime.NewScheme() + ff := &unstructured.Unstructured{} + ff.SetUnstructuredContent(getCFG(name, ns)) + fakeClient := fake.NewSimpleDynamicClient(scheme, ff) + k := Sync{ + URI: fmt.Sprintf("%s/%s", ns, name), + DynamicClient: fakeClient, + namespace: ns, + } + e := k.Init(context.TODO()) + if e != nil { + t.Errorf("Unexpected error: %v", e) + } + if k.informer == nil { + t.Errorf("Informer not initialized") + } + if k.IsReady() { + t.Errorf("The Sync should not be ready") + } + }) +} + +func TestSync(t *testing.T) { + const name = "myFF" + const ns = "myNS" + s := runtime.NewScheme() + ff := &unstructured.Unstructured{} + ff.SetUnstructuredContent(getCFG(name, ns)) + fakeDynamicClient := fake.NewSimpleDynamicClient(s, ff) + validFFCfg := &v1alpha1.FeatureFlagConfiguration{ + TypeMeta: Metadata, + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: ns, + }, + } + fakeReadClient := newFakeReadClient(validFFCfg) + l, _ := logger.NewZapLogger(zapcore.DebugLevel, "") + t.Run("Happy Path", func(t *testing.T) { + k := Sync{ + URI: fmt.Sprintf("%s/%s", ns, name), + DynamicClient: fakeDynamicClient, + ReadClient: fakeReadClient, + namespace: ns, + Logger: logger.NewLogger(l, true), + } + e := k.Init(context.TODO()) + if e != nil { + t.Errorf("Unexpected error: %v", e) + } + dataChannel := make(chan sync.DataSync) + go func() { + if err := k.Sync(context.TODO(), dataChannel); err != nil { + t.Errorf("Unexpected error: %v", e) + } + }() + d := <-dataChannel + if d.Type != sync.ALL { + t.Errorf("Expected %v, got %v", sync.ALL, d) + } + }) + t.Run("CRD not found", func(t *testing.T) { + k := Sync{ + URI: fmt.Sprintf("doesnt%s/exist%s", ns, name), + DynamicClient: fakeDynamicClient, + ReadClient: fakeReadClient, + namespace: ns, + Logger: logger.NewLogger(l, true), + } + e := k.Init(context.TODO()) + if e != nil { + t.Errorf("Unexpected error: %v", e) + } + dataChannel := make(chan sync.DataSync) + if err := k.Sync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { + t.Errorf("Unexpected error: %v", err) + } + }) + t.Run("ReSync", func(t *testing.T) { + k := Sync{ + URI: fmt.Sprintf("%s/%s", ns, name), + DynamicClient: fakeDynamicClient, + ReadClient: fakeReadClient, + namespace: ns, + Logger: logger.NewLogger(l, true), + } + e := k.Init(context.TODO()) + if e != nil { + t.Errorf("Unexpected error: %v", e) + } + dataChannel := make(chan sync.DataSync) + go func() { + if err := k.ReSync(context.TODO(), dataChannel); err != nil { + t.Errorf("Unexpected error: %v", e) + } + }() + d := <-dataChannel + if d.Type != sync.ALL { + t.Errorf("Expected %v, got %v", sync.ALL, d) + } + }) + t.Run("Resync CRD not found", func(t *testing.T) { + k := Sync{ + URI: fmt.Sprintf("doesnt%s/exist%s", ns, name), + DynamicClient: fakeDynamicClient, + ReadClient: fakeReadClient, + namespace: ns, + Logger: logger.NewLogger(l, true), + } + e := k.Init(context.TODO()) + if e != nil { + t.Errorf("Unexpected error: %v", e) + } + dataChannel := make(chan sync.DataSync) + if err := k.ReSync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { + t.Errorf("Unexpected error: %v", err) + } + }) +} + +func TestNotify(t *testing.T) { + const name = "myFF" + const ns = "myNS" + s := runtime.NewScheme() + ff := &unstructured.Unstructured{} + cfg := getCFG(name, ns) + ff.SetUnstructuredContent(cfg) + fc := fake.NewSimpleDynamicClient(s, ff) + l, _ := logger.NewZapLogger(zapcore.DebugLevel, "") + k := Sync{ + URI: fmt.Sprintf("%s/%s", ns, name), + DynamicClient: fc, + namespace: ns, + Logger: logger.NewLogger(l, true), + } + err := k.Init(context.TODO()) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if k.informer == nil { + t.Errorf("Informer not initialized") + } + c := make(chan INotify) + go func() { k.notify(context.TODO(), c) }() + + // wait for informer callbacks to be set + msg := <-c + if msg.GetEvent().EventType != DefaultEventTypeReady { + t.Errorf("Expected message %v, got %v", DefaultEventTypeReady, msg) + } + // create + cfg["status"] = map[string]interface{}{ + "empty": "", + } + ff.SetUnstructuredContent(cfg) + _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + msg = <-c + if msg.GetEvent().EventType != DefaultEventTypeCreate { + t.Errorf("Expected message %v, got %v", DefaultEventTypeCreate, msg) + } + // update + old := cfg["metadata"].(map[string]interface{}) + old["resourceVersion"] = "newVersion" + cfg["metadata"] = old + ff.SetUnstructuredContent(cfg) + _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + msg = <-c + if msg.GetEvent().EventType != DefaultEventTypeModify { + t.Errorf("Expected message %v, got %v", DefaultEventTypeModify, msg) + } + // delete + err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + msg = <-c + if msg.GetEvent().EventType != DefaultEventTypeDelete { + t.Errorf("Expected message %v, got %v", DefaultEventTypeDelete, msg) + } + + // validate we don't crash parsing wrong spec + cfg["spec"] = map[string]interface{}{ + "featureFlagSpec": int64(12), + } + ff.SetUnstructuredContent(cfg) + _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).Create(context.TODO(), ff, v1.CreateOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + cfg["status"] = map[string]interface{}{ + "bump": "1", + } + ff.SetUnstructuredContent(cfg) + _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{}) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } +} + +func newFakeReadClient(objs ...client.Object) client.Client { + _ = v1alpha1.AddToScheme(scheme.Scheme) + return fakeClient.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(objs...).Build() +} + +func getCFG(name, namespace string) map[string]interface{} { + return map[string]interface{}{ + "apiVersion": "core.openfeature.dev/v1alpha1", + "kind": "FeatureFlagConfiguration", + "metadata": map[string]interface{}{ + "name": name, + "namespace": namespace, + }, + "spec": map[string]interface{}{}, + } +} + // toUnstructured helper to convert an interface to unstructured.Unstructured func toUnstructured(t *testing.T, obj interface{}) interface{} { bytes, err := json.Marshal(obj) From 74042452bf5870a18c1ea725cfaec58a4e4c01b4 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 4 Mar 2023 09:18:21 +0100 Subject: [PATCH 2/5] add more tests Signed-off-by: Giovanni Liva --- pkg/sync/kubernetes/kubernetes_sync_test.go | 55 ++++++++++++++++----- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/pkg/sync/kubernetes/kubernetes_sync_test.go b/pkg/sync/kubernetes/kubernetes_sync_test.go index 0d820cdeb..ecc1065fa 100644 --- a/pkg/sync/kubernetes/kubernetes_sync_test.go +++ b/pkg/sync/kubernetes/kubernetes_sync_test.go @@ -5,28 +5,24 @@ import ( "encoding/json" "errors" "fmt" - "go.uber.org/zap/zapcore" - "k8s.io/client-go/kubernetes/scheme" "reflect" "strings" "testing" "time" - "github.com/open-feature/flagd/pkg/sync" - "github.com/open-feature/flagd/pkg/logger" - "k8s.io/client-go/tools/cache" - - "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" - + "github.com/open-feature/flagd/pkg/sync" "github.com/open-feature/open-feature-operator/apis/core/v1alpha1" + "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic/fake" - + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" fakeClient "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" ) var Metadata = v1.TypeMeta{ @@ -615,6 +611,7 @@ func TestSync(t *testing.T) { } fakeReadClient := newFakeReadClient(validFFCfg) l, _ := logger.NewZapLogger(zapcore.DebugLevel, "") + t.Run("Happy Path", func(t *testing.T) { k := Sync{ URI: fmt.Sprintf("%s/%s", ns, name), @@ -627,7 +624,10 @@ func TestSync(t *testing.T) { if e != nil { t.Errorf("Unexpected error: %v", e) } - dataChannel := make(chan sync.DataSync) + if k.IsReady() { + t.Errorf("The Sync should not be ready") + } + dataChannel := make(chan sync.DataSync, 1) go func() { if err := k.Sync(context.TODO(), dataChannel); err != nil { t.Errorf("Unexpected error: %v", e) @@ -650,6 +650,7 @@ func TestSync(t *testing.T) { if e != nil { t.Errorf("Unexpected error: %v", e) } + // no messages expected dataChannel := make(chan sync.DataSync) if err := k.Sync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { t.Errorf("Unexpected error: %v", err) @@ -667,7 +668,7 @@ func TestSync(t *testing.T) { if e != nil { t.Errorf("Unexpected error: %v", e) } - dataChannel := make(chan sync.DataSync) + dataChannel := make(chan sync.DataSync, 1) go func() { if err := k.ReSync(context.TODO(), dataChannel); err != nil { t.Errorf("Unexpected error: %v", e) @@ -690,6 +691,7 @@ func TestSync(t *testing.T) { if e != nil { t.Errorf("Unexpected error: %v", e) } + // no messages expected dataChannel := make(chan sync.DataSync) if err := k.ReSync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { t.Errorf("Unexpected error: %v", err) @@ -722,6 +724,10 @@ func TestNotify(t *testing.T) { c := make(chan INotify) go func() { k.notify(context.TODO(), c) }() + if k.IsReady() { + t.Errorf("The Sync should not be ready") + } + // wait for informer callbacks to be set msg := <-c if msg.GetEvent().EventType != DefaultEventTypeReady { @@ -765,7 +771,7 @@ func TestNotify(t *testing.T) { // validate we don't crash parsing wrong spec cfg["spec"] = map[string]interface{}{ - "featureFlagSpec": int64(12), + "featureFlagSpec": int64(12), // we expect string here } ff.SetUnstructuredContent(cfg) _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).Create(context.TODO(), ff, v1.CreateOptions{}) @@ -786,6 +792,31 @@ func TestNotify(t *testing.T) { } } +func Test_k8sClusterConfig(t *testing.T) { + t.Run("Cannot find KUBECONFIG file", func(tt *testing.T) { + tt.Setenv("KUBECONFIG", "") + _, err := k8sClusterConfig() + if err == nil { + tt.Error("Expected error but got none") + } + }) + t.Run("KUBECONFIG file not existing", func(tt *testing.T) { + tt.Setenv("KUBECONFIG", "value") + _, err := k8sClusterConfig() + if err == nil { + tt.Error("Expected error but got none") + } + }) + t.Run("Default REST Config and missing svc account", func(tt *testing.T) { + tt.Setenv("KUBERNETES_SERVICE_HOST", "127.0.0.1") + tt.Setenv("KUBERNETES_SERVICE_PORT", "8080") + _, err := k8sClusterConfig() + if err == nil { + tt.Error("Expected error but got none") + } + }) +} + func newFakeReadClient(objs ...client.Object) client.Client { _ = v1alpha1.AddToScheme(scheme.Scheme) return fakeClient.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(objs...).Build() From 62de8790b4d46b9437d6e85f6a65a159e0596363 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Tue, 7 Mar 2023 11:31:41 +0100 Subject: [PATCH 3/5] improve tests Signed-off-by: Giovanni Liva --- pkg/runtime/from_config.go | 14 +- pkg/sync/kubernetes/kubernetes_sync.go | 76 +++---- pkg/sync/kubernetes/kubernetes_sync_test.go | 208 +++++++++++--------- 3 files changed, 160 insertions(+), 138 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index e61a8392b..74143ea0a 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -136,16 +136,16 @@ func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, e if err != nil { return nil, err } - return &kubernetes.Sync{ - Logger: logger.WithFields( + return kubernetes.NewK8sSync( + logger.WithFields( zap.String("component", "sync"), zap.String("sync", "kubernetes"), ), - URI: regCrd.ReplaceAllString(uri, ""), - ProviderArgs: r.config.ProviderArgs, - ReadClient: reader, - DynamicClient: dynamic, - }, nil + regCrd.ReplaceAllString(uri, ""), + r.config.ProviderArgs, + reader, + dynamic, + ), nil } func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync { diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index 64d684054..ea203c5cf 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -23,22 +23,22 @@ import ( ) var ( - resyncPeriod = 1 * time.Minute - apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) - featurFlagConfigurationResource = v1alpha1.GroupVersion.WithResource("featureflagconfigurations") + resyncPeriod = 1 * time.Minute + apiVersion = fmt.Sprintf("%s/%s", v1alpha1.GroupVersion.Group, v1alpha1.GroupVersion.Version) + featureFlagConfigurationResource = v1alpha1.GroupVersion.WithResource("featureflagconfigurations") ) type Sync struct { - Logger *logger.Logger - ProviderArgs sync.ProviderArgs - URI string - ReadClient client.Reader - DynamicClient dynamic.Interface - - ready bool - namespace string - crdName string - informer cache.SharedInformer + URI string + + ready bool + namespace string + crdName string + logger *logger.Logger + providerArgs sync.ProviderArgs + readClient client.Reader + dynamicClient dynamic.Interface + informer cache.SharedInformer } func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { @@ -64,9 +64,9 @@ func (k *Sync) Init(ctx context.Context) error { // The created informer will not do resyncs if the given defaultEventHandlerResyncPeriod is zero. // For more details on resync implications refer to tools/cache/shared_informer.go - factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.DynamicClient, resyncPeriod, k.namespace, nil) + factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(k.dynamicClient, resyncPeriod, k.namespace, nil) - k.informer = factory.ForResource(featurFlagConfigurationResource).Informer() + k.informer = factory.ForResource(featureFlagConfigurationResource).Informer() return nil } @@ -76,12 +76,12 @@ func (k *Sync) IsReady() bool { } func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - k.Logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI)) + k.logger.Info(fmt.Sprintf("starting kubernetes sync notifier for resource: %s", k.URI)) // Initial fetch fetch, err := k.fetch(ctx) if err != nil { - k.Logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error())) + k.logger.Error(fmt.Sprintf("error with the initial fetch: %s", err.Error())) return err } @@ -117,27 +117,27 @@ func (k *Sync) watcher(ctx context.Context, notifies chan INotify, dataSync chan case w := <-notifies: switch w.GetEvent().EventType { case DefaultEventTypeCreate: - k.Logger.Debug("new configuration created") + k.logger.Debug("new configuration created") msg, err := k.fetch(ctx) if err != nil { - k.Logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error())) + k.logger.Error(fmt.Sprintf("error fetching after create notification: %s", err.Error())) continue } dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL} case DefaultEventTypeModify: - k.Logger.Debug("Configuration modified") + k.logger.Debug("Configuration modified") msg, err := k.fetch(ctx) if err != nil { - k.Logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error())) + k.logger.Error(fmt.Sprintf("error fetching after update notification: %s", err.Error())) continue } dataSync <- sync.DataSync{FlagData: msg, Source: k.URI, Type: sync.ALL} case DefaultEventTypeDelete: - k.Logger.Debug("configuration deleted") + k.logger.Debug("configuration deleted") case DefaultEventTypeReady: - k.Logger.Debug("notifier ready") + k.logger.Debug("notifier ready") k.ready = true } } @@ -158,13 +158,13 @@ func (k *Sync) fetch(ctx context.Context) (string, error) { return "", err } - k.Logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI)) + k.logger.Debug(fmt.Sprintf("resource %s served from the informer cache", k.URI)) return configuration.Spec.FeatureFlagSpec, nil } // fallback to API access - this is an informer cache miss. Could happen at the startup where cache is not filled var ff v1alpha1.FeatureFlagConfiguration - err = k.ReadClient.Get(ctx, client.ObjectKey{ + err = k.readClient.Get(ctx, client.ObjectKey{ Name: k.crdName, Namespace: k.namespace, }, &ff) @@ -172,7 +172,7 @@ func (k *Sync) fetch(ctx context.Context) (string, error) { return "", err } - k.Logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI)) + k.logger.Debug(fmt.Sprintf("resource %s served from API server", k.URI)) return ff.Spec.FeatureFlagSpec, nil } @@ -183,25 +183,25 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) { } if _, err := k.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - k.Logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name)) + k.logger.Info(fmt.Sprintf("kube sync notifier event: add: %s %s", objectKey.Namespace, objectKey.Name)) if err := commonHandler(obj, objectKey, DefaultEventTypeCreate, c); err != nil { - k.Logger.Warn(err.Error()) + k.logger.Warn(err.Error()) } }, UpdateFunc: func(oldObj, newObj interface{}) { - k.Logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name)) + k.logger.Info(fmt.Sprintf("kube sync notifier event: update: %s %s", objectKey.Namespace, objectKey.Name)) if err := updateFuncHandler(oldObj, newObj, objectKey, c); err != nil { - k.Logger.Warn(err.Error()) + k.logger.Warn(err.Error()) } }, DeleteFunc: func(obj interface{}) { - k.Logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name)) + k.logger.Info(fmt.Sprintf("kube sync notifier event: delete: %s %s", objectKey.Namespace, objectKey.Name)) if err := commonHandler(obj, objectKey, DefaultEventTypeDelete, c); err != nil { - k.Logger.Warn(err.Error()) + k.logger.Warn(err.Error()) } }, }); err != nil { - k.Logger.Fatal(err.Error()) + k.logger.Fatal(err.Error()) } c <- &Notifier{ @@ -312,6 +312,16 @@ func k8sClusterConfig() (*rest.Config, error) { return clusterConfig, nil } +func NewK8sSync(logger *logger.Logger, uri string, providerArgs sync.ProviderArgs, reader client.Reader, dynamic dynamic.Interface) *Sync { + return &Sync{ + logger: logger, + URI: uri, + providerArgs: providerArgs, + readClient: reader, + dynamicClient: dynamic, + } +} + func GetClients() (client.Reader, dynamic.Interface, error) { clusterConfig, err := k8sClusterConfig() if err != nil { diff --git a/pkg/sync/kubernetes/kubernetes_sync_test.go b/pkg/sync/kubernetes/kubernetes_sync_test.go index ecc1065fa..7ea7d4df5 100644 --- a/pkg/sync/kubernetes/kubernetes_sync_test.go +++ b/pkg/sync/kubernetes/kubernetes_sync_test.go @@ -445,11 +445,11 @@ func TestSync_fetch(t *testing.T) { GetByKeyFunc: tt.args.InformerGetFunc, }, }, - ReadClient: &MockClient{ + readClient: &MockClient{ getResponse: tt.args.ClientResponse, clientErr: tt.args.ClientError, }, - Logger: logger.NewLogger(nil, false), + logger: logger.NewLogger(nil, false), } // Test fetch @@ -530,7 +530,7 @@ func TestSync_watcher(t *testing.T) { GetByKeyFunc: tt.args.InformerGetFunc, }, }, - Logger: logger.NewLogger(nil, false), + logger: logger.NewLogger(nil, false), } // create communication channels with buffer to so that calls are non-blocking @@ -579,7 +579,7 @@ func TestInit(t *testing.T) { fakeClient := fake.NewSimpleDynamicClient(scheme, ff) k := Sync{ URI: fmt.Sprintf("%s/%s", ns, name), - DynamicClient: fakeClient, + dynamicClient: fakeClient, namespace: ns, } e := k.Init(context.TODO()) @@ -595,7 +595,7 @@ func TestInit(t *testing.T) { }) } -func TestSync(t *testing.T) { +func TestSync_ReSync(t *testing.T) { const name = "myFF" const ns = "myNS" s := runtime.NewScheme() @@ -610,93 +610,80 @@ func TestSync(t *testing.T) { }, } fakeReadClient := newFakeReadClient(validFFCfg) - l, _ := logger.NewZapLogger(zapcore.DebugLevel, "") + l, err := logger.NewZapLogger(zapcore.FatalLevel, "console") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } - t.Run("Happy Path", func(t *testing.T) { - k := Sync{ - URI: fmt.Sprintf("%s/%s", ns, name), - DynamicClient: fakeDynamicClient, - ReadClient: fakeReadClient, - namespace: ns, - Logger: logger.NewLogger(l, true), - } - e := k.Init(context.TODO()) - if e != nil { - t.Errorf("Unexpected error: %v", e) - } - if k.IsReady() { - t.Errorf("The Sync should not be ready") - } - dataChannel := make(chan sync.DataSync, 1) - go func() { - if err := k.Sync(context.TODO(), dataChannel); err != nil { + tests := []struct { + name string + k Sync + countMsg int + async bool + }{ + { + name: "Happy Path", + k: Sync{ + URI: fmt.Sprintf("%s/%s", ns, name), + dynamicClient: fakeDynamicClient, + readClient: fakeReadClient, + namespace: ns, + logger: logger.NewLogger(l, true), + }, + countMsg: 2, // one for sync and one for resync + async: true, + }, + { + name: "CRD not found", + k: Sync{ + URI: fmt.Sprintf("doesnt%s/exist%s", ns, name), + dynamicClient: fakeDynamicClient, + readClient: fakeReadClient, + namespace: ns, + logger: logger.NewLogger(l, true), + }, + countMsg: 0, + async: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + e := tt.k.Init(context.TODO()) + if e != nil { t.Errorf("Unexpected error: %v", e) } - }() - d := <-dataChannel - if d.Type != sync.ALL { - t.Errorf("Expected %v, got %v", sync.ALL, d) - } - }) - t.Run("CRD not found", func(t *testing.T) { - k := Sync{ - URI: fmt.Sprintf("doesnt%s/exist%s", ns, name), - DynamicClient: fakeDynamicClient, - ReadClient: fakeReadClient, - namespace: ns, - Logger: logger.NewLogger(l, true), - } - e := k.Init(context.TODO()) - if e != nil { - t.Errorf("Unexpected error: %v", e) - } - // no messages expected - dataChannel := make(chan sync.DataSync) - if err := k.Sync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { - t.Errorf("Unexpected error: %v", err) - } - }) - t.Run("ReSync", func(t *testing.T) { - k := Sync{ - URI: fmt.Sprintf("%s/%s", ns, name), - DynamicClient: fakeDynamicClient, - ReadClient: fakeReadClient, - namespace: ns, - Logger: logger.NewLogger(l, true), - } - e := k.Init(context.TODO()) - if e != nil { - t.Errorf("Unexpected error: %v", e) - } - dataChannel := make(chan sync.DataSync, 1) - go func() { - if err := k.ReSync(context.TODO(), dataChannel); err != nil { - t.Errorf("Unexpected error: %v", e) + if tt.k.IsReady() { + t.Errorf("The Sync should not be ready") } - }() - d := <-dataChannel - if d.Type != sync.ALL { - t.Errorf("Expected %v, got %v", sync.ALL, d) - } - }) - t.Run("Resync CRD not found", func(t *testing.T) { - k := Sync{ - URI: fmt.Sprintf("doesnt%s/exist%s", ns, name), - DynamicClient: fakeDynamicClient, - ReadClient: fakeReadClient, - namespace: ns, - Logger: logger.NewLogger(l, true), - } - e := k.Init(context.TODO()) - if e != nil { - t.Errorf("Unexpected error: %v", e) - } - // no messages expected - dataChannel := make(chan sync.DataSync) - if err := k.ReSync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { - t.Errorf("Unexpected error: %v", err) - } - }) + dataChannel := make(chan sync.DataSync, tt.countMsg) + if tt.async { + go func() { + if err := tt.k.Sync(context.TODO(), dataChannel); err != nil { + t.Errorf("Unexpected error: %v", e) + } + if err := tt.k.ReSync(context.TODO(), dataChannel); err != nil { + t.Errorf("Unexpected error: %v", e) + } + + }() + i := tt.countMsg + for i > 0 { + d := <-dataChannel + if d.Type != sync.ALL { + t.Errorf("Expected %v, got %v", sync.ALL, d) + } + i-- + } + } else { + if err := tt.k.Sync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { + t.Errorf("Unexpected error: %v", err) + } + if err := tt.k.ReSync(context.TODO(), dataChannel); !strings.Contains(err.Error(), "not found") { + t.Errorf("Unexpected error: %v", err) + } + } + }) + } } func TestNotify(t *testing.T) { @@ -707,14 +694,17 @@ func TestNotify(t *testing.T) { cfg := getCFG(name, ns) ff.SetUnstructuredContent(cfg) fc := fake.NewSimpleDynamicClient(s, ff) - l, _ := logger.NewZapLogger(zapcore.DebugLevel, "") + l, err := logger.NewZapLogger(zapcore.FatalLevel, "console") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } k := Sync{ URI: fmt.Sprintf("%s/%s", ns, name), - DynamicClient: fc, + dynamicClient: fc, namespace: ns, - Logger: logger.NewLogger(l, true), + logger: logger.NewLogger(l, true), } - err := k.Init(context.TODO()) + err = k.Init(context.TODO()) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -738,7 +728,7 @@ func TestNotify(t *testing.T) { "empty": "", } ff.SetUnstructuredContent(cfg) - _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) + _, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -751,7 +741,7 @@ func TestNotify(t *testing.T) { old["resourceVersion"] = "newVersion" cfg["metadata"] = old ff.SetUnstructuredContent(cfg) - _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) + _, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -760,7 +750,7 @@ func TestNotify(t *testing.T) { t.Errorf("Expected message %v, got %v", DefaultEventTypeModify, msg) } // delete - err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{}) + err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{}) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -774,7 +764,7 @@ func TestNotify(t *testing.T) { "featureFlagSpec": int64(12), // we expect string here } ff.SetUnstructuredContent(cfg) - _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).Create(context.TODO(), ff, v1.CreateOptions{}) + _, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).Create(context.TODO(), ff, v1.CreateOptions{}) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -782,11 +772,11 @@ func TestNotify(t *testing.T) { "bump": "1", } ff.SetUnstructuredContent(cfg) - _, err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) + _, err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).UpdateStatus(context.TODO(), ff, v1.UpdateOptions{}) if err != nil { t.Errorf("Unexpected error: %v", err) } - err = fc.Resource(featurFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{}) + err = fc.Resource(featureFlagConfigurationResource).Namespace(ns).Delete(context.TODO(), name, v1.DeleteOptions{}) if err != nil { t.Errorf("Unexpected error: %v", err) } @@ -817,6 +807,28 @@ func Test_k8sClusterConfig(t *testing.T) { }) } +func Test_NewK8sSync(t *testing.T) { + l, err := logger.NewZapLogger(zapcore.FatalLevel, "console") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + const uri = "myURI" + args := map[string]string{"myArg": "myVal"} + rc := newFakeReadClient() + fc := fake.NewSimpleDynamicClient(runtime.NewScheme()) + k := NewK8sSync( + logger.NewLogger(l, true), + uri, + args, + rc, + fc, + ) + if k == nil { + t.Errorf("Object not initialized properly") + } + // TODO complete asserts +} + func newFakeReadClient(objs ...client.Object) client.Client { _ = v1alpha1.AddToScheme(scheme.Scheme) return fakeClient.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(objs...).Build() From 56a2021284dfe4341b12970c52a4e613ff602ffc Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Tue, 7 Mar 2023 12:19:14 +0100 Subject: [PATCH 4/5] add more assertions Signed-off-by: Giovanni Liva --- pkg/sync/kubernetes/kubernetes_sync.go | 8 +++++- pkg/sync/kubernetes/kubernetes_sync_test.go | 27 ++++++++++++++++----- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index ea203c5cf..df444cee4 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -312,7 +312,13 @@ func k8sClusterConfig() (*rest.Config, error) { return clusterConfig, nil } -func NewK8sSync(logger *logger.Logger, uri string, providerArgs sync.ProviderArgs, reader client.Reader, dynamic dynamic.Interface) *Sync { +func NewK8sSync( + logger *logger.Logger, + uri string, + providerArgs sync.ProviderArgs, + reader client.Reader, + dynamic dynamic.Interface, +) *Sync { return &Sync{ logger: logger, URI: uri, diff --git a/pkg/sync/kubernetes/kubernetes_sync_test.go b/pkg/sync/kubernetes/kubernetes_sync_test.go index 7ea7d4df5..013b80def 100644 --- a/pkg/sync/kubernetes/kubernetes_sync_test.go +++ b/pkg/sync/kubernetes/kubernetes_sync_test.go @@ -664,7 +664,6 @@ func TestSync_ReSync(t *testing.T) { if err := tt.k.ReSync(context.TODO(), dataChannel); err != nil { t.Errorf("Unexpected error: %v", e) } - }() i := tt.countMsg for i > 0 { @@ -813,20 +812,36 @@ func Test_NewK8sSync(t *testing.T) { t.Errorf("Unexpected error: %v", err) } const uri = "myURI" - args := map[string]string{"myArg": "myVal"} + log := logger.NewLogger(l, true) + const key, value = "myKey", "myValue" + args := map[string]string{key: value} rc := newFakeReadClient() - fc := fake.NewSimpleDynamicClient(runtime.NewScheme()) + dc := fake.NewSimpleDynamicClient(runtime.NewScheme()) k := NewK8sSync( - logger.NewLogger(l, true), + log, uri, args, rc, - fc, + dc, ) if k == nil { t.Errorf("Object not initialized properly") } - // TODO complete asserts + if k.URI != uri { + t.Errorf("Object not initialized with the right URI") + } + if k.logger != log { + t.Errorf("Object not initialized with the right logger") + } + if k.providerArgs[key] != value { + t.Errorf("Object not initialized with the right arguments") + } + if k.readClient != rc { + t.Errorf("Object not initialized with the right K8s client") + } + if k.dynamicClient != dc { + t.Errorf("Object not initialized with the right K8s dynamic client") + } } func newFakeReadClient(objs ...client.Object) client.Client { From a6924528e4931b265ab3726ae83bb2845e85e373 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Wed, 8 Mar 2023 11:39:48 +0100 Subject: [PATCH 5/5] reorder functions Signed-off-by: Giovanni Liva --- pkg/sync/kubernetes/kubernetes_sync.go | 68 +++++++++++++------------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index df444cee4..f78e6cd79 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -41,6 +41,40 @@ type Sync struct { informer cache.SharedInformer } +func NewK8sSync( + logger *logger.Logger, + uri string, + providerArgs sync.ProviderArgs, + reader client.Reader, + dynamic dynamic.Interface, +) *Sync { + return &Sync{ + logger: logger, + URI: uri, + providerArgs: providerArgs, + readClient: reader, + dynamicClient: dynamic, + } +} + +func GetClients() (client.Reader, dynamic.Interface, error) { + clusterConfig, err := k8sClusterConfig() + if err != nil { + return nil, nil, err + } + + readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) + if err != nil { + return nil, nil, err + } + + dynamicClient, err := dynamic.NewForConfig(clusterConfig) + if err != nil { + return nil, nil, err + } + return readClient, dynamicClient, nil +} + func (k *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error { fetch, err := k.fetch(ctx) if err != nil { @@ -311,37 +345,3 @@ func k8sClusterConfig() (*rest.Config, error) { return clusterConfig, nil } - -func NewK8sSync( - logger *logger.Logger, - uri string, - providerArgs sync.ProviderArgs, - reader client.Reader, - dynamic dynamic.Interface, -) *Sync { - return &Sync{ - logger: logger, - URI: uri, - providerArgs: providerArgs, - readClient: reader, - dynamicClient: dynamic, - } -} - -func GetClients() (client.Reader, dynamic.Interface, error) { - clusterConfig, err := k8sClusterConfig() - if err != nil { - return nil, nil, err - } - - readClient, err := client.New(clusterConfig, client.Options{Scheme: scheme.Scheme}) - if err != nil { - return nil, nil, err - } - - dynamicClient, err := dynamic.NewForConfig(clusterConfig) - if err != nil { - return nil, nil, err - } - return readClient, dynamicClient, nil -}