From 28fd32c9112f11e6d67f3bab3643c289dadd1ce7 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 18 Feb 2023 11:00:42 +0100 Subject: [PATCH 01/15] Add basic impl structure Signed-off-by: Giovanni Liva --- pkg/service/connect_service.go | 17 +++++++++++++++-- pkg/sync/isync.go | 4 ++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index e413f2052..c879b196a 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -87,6 +87,11 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error } } +func (s *ConnectService) isReady() bool { + // check if all sync provider are loaded + return true +} + func (s *ConnectService) setupServer() (net.Listener, error) { var lis net.Listener var err error @@ -107,13 +112,21 @@ func (s *ConnectService) setupServer() (net.Listener, error) { }) h := Handler("", mdlw, mux) go func() { - s.Logger.Info(fmt.Sprintf("metrics listening at %d", s.ConnectServiceConfiguration.MetricsPort)) + s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", s.ConnectServiceConfiguration.MetricsPort)) server := &http.Server{ Addr: fmt.Sprintf(":%d", s.ConnectServiceConfiguration.MetricsPort), ReadHeaderTimeout: 3 * time.Second, } server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/metrics" { + if r.URL.Path == "/healthz" { + w.WriteHeader(http.StatusOK) + } else if r.URL.Path == "/readyz" { + if s.isReady() { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusPreconditionFailed) + } + } else if r.URL.Path == "/metrics" { promhttp.Handler().ServeHTTP(w, r) } else { w.WriteHeader(http.StatusNotFound) diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 86ed7c2d8..7cbe68bb7 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -38,6 +38,10 @@ ISync implementations watch for changes in the flag sources (HTTP backend, local value and communicate to the Runtime with DataSync channel */ type ISync interface { + + // Init is used by the sync provider to initialize its data structures and external dependencies. + Init(ctx context.Context) error + // Sync is the contract between Runtime and sync implementation. // Note that, it is expected to return the first data sync as soon as possible to fill the store. Sync(ctx context.Context, dataSync chan<- DataSync) error From bfe20f50df86d4c4b4979d09e95f42a9f386bc95 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 18 Feb 2023 22:16:29 +0100 Subject: [PATCH 02/15] add basic impl Signed-off-by: Giovanni Liva --- config/samples/example_flags.flagd.json | 134 ------------------------ pkg/runtime/runtime.go | 28 ++++- pkg/service/connect_service.go | 13 +-- pkg/service/connect_service_test.go | 9 +- pkg/service/iservice.go | 8 +- pkg/sync/file/filepath_sync.go | 4 + pkg/sync/grpc/grpc_sync.go | 4 + pkg/sync/http/http_sync.go | 4 + pkg/sync/kubernetes/kubernetes_sync.go | 4 + 9 files changed, 59 insertions(+), 149 deletions(-) delete mode 100644 config/samples/example_flags.flagd.json diff --git a/config/samples/example_flags.flagd.json b/config/samples/example_flags.flagd.json deleted file mode 100644 index 785f793a4..000000000 --- a/config/samples/example_flags.flagd.json +++ /dev/null @@ -1,134 +0,0 @@ -{ - "flags": { - "myBoolFlag": { - "state": "ENABLED", - "variants": { - "on": true, - "off": false - }, - "defaultVariant": "on" - }, - "myStringFlag": { - "state": "ENABLED", - "variants": { - "key1": "val1", - "key2": "val2" - }, - "defaultVariant": "key1" - }, - "myFloatFlag": { - "state": "ENABLED", - "variants": { - "one": 1.23, - "two": 2.34 - }, - "defaultVariant": "one" - }, - "myIntFlag": { - "state": "ENABLED", - "variants": { - "one": 1, - "two": 2 - }, - "defaultVariant": "one" - }, - "myObjectFlag": { - "state": "ENABLED", - "variants": { - "object1": { - "key": "val" - }, - "object2": { - "key": true - } - }, - "defaultVariant": "object1" - }, - "isColorYellow": { - "state": "ENABLED", - "variants": { - "on": true, - "off": false - }, - "defaultVariant": "off", - "targeting": { - "if": [ - { - "==": [ - { - "var": [ - "color" - ] - }, - "yellow" - ] - }, - "on", - "off" - ] - } - }, - "fibAlgo": { - "variants": { - "recursive": "recursive", - "memo": "memo", - "loop": "loop", - "binet": "binet" - }, - "defaultVariant": "recursive", - "state": "ENABLED", - "targeting": { - "if": [ - { - "$ref": "emailWithFaas" - }, "binet", null - ] - } - }, - "headerColor": { - "variants": { - "red": "#FF0000", - "blue": "#0000FF", - "green": "#00FF00", - "yellow": "#FFFF00" - }, - "defaultVariant": "red", - "state": "ENABLED", - "targeting": { - "if": [ - { - "$ref": "emailWithFaas" - }, - { - "fractionalEvaluation": [ - "email", - [ - "red", - 25 - ], - [ - "blue", - 25 - ], - [ - "green", - 25 - ], - [ - "yellow", - 25 - ] - ] - }, null - ] - } - } - }, - "$evaluators": { - "emailWithFaas": { - "in": ["@faas.com", { - "var": ["email"] - }] - } - } -} diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 0b9ffefbb..982141960 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -24,6 +24,7 @@ type Runtime struct { mu msync.Mutex Evaluator eval.IEvaluator Logger *logger.Logger + ready bool } type Config struct { @@ -55,6 +56,9 @@ func (r *Runtime) Start() error { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() + // set flagd to NOT be ready to serve flag evaluations + r.setProviderReady(false) + g, gCtx := errgroup.WithContext(ctx) dataSync := make(chan sync.DataSync, len(r.SyncImpl)) @@ -70,7 +74,15 @@ func (r *Runtime) Start() error { } }) - // Start sync providers + // Init sync providers + for _, s := range r.SyncImpl { + if err := s.Init(gCtx); err != nil { + return err + } + } + r.setProviderReady(true) + + // Start sync provider for _, s := range r.SyncImpl { p := s g.Go(func() error { @@ -79,16 +91,28 @@ func (r *Runtime) Start() error { } g.Go(func() error { - return r.Service.Serve(gCtx, r.Evaluator) + return r.Service.Serve(gCtx, r.Evaluator, service.ServiceConfiguration{ + ReadinessProbe: func() bool { + return r.ready + }, + }) }) <-gCtx.Done() if err := g.Wait(); err != nil { + r.setProviderReady(false) + // TODO: try to recover from error return err } return nil } +func (r *Runtime) setProviderReady(val bool) { + r.mu.Lock() + r.ready = val + r.mu.Unlock() +} + // updateWithNotify helps to update state and notify listeners func (r *Runtime) updateWithNotify(payload sync.DataSync) { r.mu.Lock() diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index c879b196a..5b8875ca2 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -48,13 +48,13 @@ type eventingConfiguration struct { subs map[interface{}]chan Notification } -func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error { +func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf ServiceConfiguration) error { s.Eval = eval s.eventingConfiguration = &eventingConfiguration{ subs: make(map[interface{}]chan Notification), mu: &sync.RWMutex{}, } - lis, err := s.setupServer() + lis, err := s.setupServer(svcConf) if err != nil { return err } @@ -87,12 +87,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator) error } } -func (s *ConnectService) isReady() bool { - // check if all sync provider are loaded - return true -} - -func (s *ConnectService) setupServer() (net.Listener, error) { +func (s *ConnectService) setupServer(svcConf ServiceConfiguration) (net.Listener, error) { var lis net.Listener var err error mux := http.NewServeMux() @@ -121,7 +116,7 @@ func (s *ConnectService) setupServer() (net.Listener, error) { if r.URL.Path == "/healthz" { w.WriteHeader(http.StatusOK) } else if r.URL.Path == "/readyz" { - if s.isReady() { + if svcConf.ReadinessProbe() { w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusPreconditionFailed) diff --git a/pkg/service/connect_service_test.go b/pkg/service/connect_service_test.go index 10dc3eeeb..76f866f11 100644 --- a/pkg/service/connect_service_test.go +++ b/pkg/service/connect_service_test.go @@ -69,18 +69,23 @@ func TestConnectService_UnixConnection(t *testing.T) { tt.evalFields.reason, tt.evalFields.err, ).AnyTimes() - service := service.ConnectService{ + svc := service.ConnectService{ ConnectServiceConfiguration: &service.ConnectServiceConfiguration{ ServerSocketPath: tt.socketPath, }, Logger: logger.NewLogger(nil, false), } + serveConf := service.ServiceConfiguration{ + ReadinessProbe: func() bool { + return true + }, + } ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() go func() { - err := service.Serve(ctx, eval) + err := svc.Serve(ctx, eval, serveConf) fmt.Println(err) }() conn, err := grpc.Dial( diff --git a/pkg/service/iservice.go b/pkg/service/iservice.go index f289fa113..6f7dc558d 100644 --- a/pkg/service/iservice.go +++ b/pkg/service/iservice.go @@ -19,12 +19,16 @@ type Notification struct { Data map[string]interface{} `json:"data"` } -type IServiceConfiguration interface{} +type ReadinessProbe func() bool + +type ServiceConfiguration struct { + ReadinessProbe ReadinessProbe +} /* IService implementations define handlers for a particular transport, which call the IEvaluator implementation. */ type IService interface { - Serve(ctx context.Context, eval eval.IEvaluator) error + Serve(ctx context.Context, eval eval.IEvaluator, svcConf ServiceConfiguration) error Notify(n Notification) } diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index b7227991a..ecbbbb301 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -27,6 +27,10 @@ type Sync struct { // default state is used to prevent EOF errors when handling filepath delete events + empty files const defaultState = "{}" +func (fs *Sync) Init(ctx context.Context) error { + return nil +} + //nolint:funlen func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { fs.Logger.Info("Starting filepath sync notifier") diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 20d96f485..4f3a36a07 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -36,6 +36,10 @@ type Sync struct { Logger *logger.Logger } +func (g *Sync) Init(ctx context.Context) error { + return nil +} + func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { options := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index 3ce78fb80..4e6ce45e2 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -37,6 +37,10 @@ type Cron interface { Stop() } +func (hs *Sync) Init(ctx context.Context) error { + return nil +} + func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // Initial fetch fetch, err := hs.Fetch(ctx) diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index f7f094cb1..097e09873 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -32,6 +32,10 @@ type Sync struct { URI string } +func (k *Sync) Init(ctx context.Context) error { + return nil +} + func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // Initial fetch fetch, err := k.fetch(ctx) From 32e4f87a386bfc6f5cfbf5887479c9ae539006a2 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 18 Feb 2023 22:30:57 +0100 Subject: [PATCH 03/15] Refactor providers Signed-off-by: Giovanni Liva --- pkg/sync/file/filepath_sync.go | 25 +++++++++++++------------ pkg/sync/file/filepath_sync_test.go | 7 ++++++- pkg/sync/grpc/grpc_sync.go | 20 +++++++++++--------- pkg/sync/http/http_sync.go | 1 + pkg/sync/kubernetes/kubernetes_sync.go | 1 + 5 files changed, 32 insertions(+), 22 deletions(-) diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index ecbbbb301..75db38018 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -22,35 +22,36 @@ type Sync struct { ProviderArgs sync.ProviderArgs // FileType indicates the file type e.g., json, yaml/yml etc., fileType string + watcher *fsnotify.Watcher } // default state is used to prevent EOF errors when handling filepath delete events + empty files const defaultState = "{}" func (fs *Sync) Init(ctx context.Context) error { - return nil -} - -//nolint:funlen -func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { fs.Logger.Info("Starting filepath sync notifier") - watcher, err := fsnotify.NewWatcher() + w, err := fsnotify.NewWatcher() if err != nil { return err } - defer watcher.Close() - - err = watcher.Add(fs.URI) + fs.watcher = w + err = fs.watcher.Add(fs.URI) if err != nil { return err } + return nil +} + +//nolint:funlen +func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { + defer fs.watcher.Close() fs.sendDataSync(ctx, sync.ALL, dataSync) fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) for { select { - case event, ok := <-watcher.Events: + case event, ok := <-fs.watcher.Events: if !ok { fs.Logger.Info("filepath notifier closed") return errors.New("filepath notifier closed") @@ -63,7 +64,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { case event.Has(fsnotify.Remove): // K8s exposes config maps as symlinks. // Updates cause a remove event, we need to re-add the watcher in this case. - err = watcher.Add(fs.URI) + err := fs.watcher.Add(fs.URI) if err != nil { // the watcher could not be re-added, so the file must have been deleted fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error())) @@ -84,7 +85,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } - case err, ok := <-watcher.Errors: + case err, ok := <-fs.watcher.Errors: if !ok { return errors.New("watcher error") } diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 1d1de2f5f..81f0f7246 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -96,10 +96,15 @@ func TestSimpleSync(t *testing.T) { createFile(t, fetchDirName, fetchFileName) ctx := context.Background() + err := handler.Init(ctx) + if err != nil { + log.Fatalf("Error init sync: %s", err.Error()) + return + } dataSyncChan := make(chan sync.DataSync, len(tt.expectedDataSync)) go func() { - err := handler.Sync(ctx, dataSyncChan) + err = handler.Sync(ctx, dataSyncChan) if err != nil { log.Fatalf("Error start sync: %s", err.Error()) return diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 4f3a36a07..3d9943841 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -34,19 +34,17 @@ type Sync struct { Target string ProviderID string Logger *logger.Logger + client syncv1grpc.FlagSyncService_SyncFlagsClient + options []grpc.DialOption } func (g *Sync) Init(ctx context.Context) error { - return nil -} - -func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - options := []grpc.DialOption{ + g.options = []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), } // initial dial and connection. Failure here must result in a startup failure - dial, err := grpc.DialContext(ctx, g.Target, options...) + dial, err := grpc.DialContext(ctx, g.Target, g.options...) if err != nil { g.Logger.Error(fmt.Sprintf("error establishing grpc connection: %s", err.Error())) return err @@ -58,14 +56,18 @@ func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { g.Logger.Error(fmt.Sprintf("error calling streaming operation: %s", err.Error())) return err } + g.client = syncClient + return nil +} + +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // initial stream listening - err = g.handleFlagSync(syncClient, dataSync) + err := g.handleFlagSync(g.client, dataSync) g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) - // retry connection establishment for { - syncClient, ok := g.connectWithRetry(ctx, options...) + syncClient, ok := g.connectWithRetry(ctx, g.options...) if !ok { // We shall exit return nil diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index 4e6ce45e2..b54397390 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -38,6 +38,7 @@ type Cron interface { } func (hs *Sync) Init(ctx context.Context) error { + // noop return nil } diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index 097e09873..c9244d25b 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -33,6 +33,7 @@ type Sync struct { } func (k *Sync) Init(ctx context.Context) error { + // noop return nil } From 12683c8714ed469e21271e5a0af508d1364befa8 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 18 Feb 2023 23:07:35 +0100 Subject: [PATCH 04/15] push readiness into sync impl Signed-off-by: Giovanni Liva --- pkg/runtime/from_config.go | 63 ++++++++++++++------------ pkg/runtime/runtime.go | 21 ++++----- pkg/sync/file/filepath_sync.go | 9 +++- pkg/sync/file/filepath_sync_test.go | 37 +++++++++++++++ pkg/sync/grpc/grpc_sync.go | 12 ++++- pkg/sync/http/http_sync.go | 5 ++ pkg/sync/isync.go | 3 ++ pkg/sync/kubernetes/kubernetes_sync.go | 8 +++- 8 files changed, 112 insertions(+), 46 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 4ff8c4e12..0aed576f9 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -77,38 +77,45 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { }) rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri)) case regCrd.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &kubernetes.Sync{ - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "kubernetes"), - ), - URI: regCrd.ReplaceAllString(uri, ""), - ProviderArgs: r.config.ProviderArgs, - }) + r.SyncImpl = append( + r.SyncImpl, + &kubernetes.Sync{ + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "kubernetes"), + ), + URI: regCrd.ReplaceAllString(uri, ""), + ProviderArgs: r.config.ProviderArgs, + }, + ) rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri)) case regURL.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &httpSync.Sync{ - URI: uri, - BearerToken: r.config.SyncBearerToken, - Client: &http.Client{ - Timeout: time.Second * 10, - }, - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "remote"), - ), - ProviderArgs: r.config.ProviderArgs, - Cron: cron.New(), - }) + r.SyncImpl = append( + r.SyncImpl, + &httpSync.Sync{ + URI: uri, + BearerToken: r.config.SyncBearerToken, + Client: &http.Client{ + Timeout: time.Second * 10, + }, + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "remote"), + ), + ProviderArgs: r.config.ProviderArgs, + Cron: cron.New(), + }) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) case regGRPC.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &grpc.Sync{ - Target: grpc.URLToGRPCTarget(uri), - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "grpc"), - ), - }) + r.SyncImpl = append( + r.SyncImpl, + &grpc.Sync{ + Target: grpc.URLToGRPCTarget(uri), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + }) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ " or 'core.openfeature.dev'", uri) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 982141960..7fa9d1a45 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -56,9 +56,6 @@ func (r *Runtime) Start() error { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) defer cancel() - // set flagd to NOT be ready to serve flag evaluations - r.setProviderReady(false) - g, gCtx := errgroup.WithContext(ctx) dataSync := make(chan sync.DataSync, len(r.SyncImpl)) @@ -80,7 +77,6 @@ func (r *Runtime) Start() error { return err } } - r.setProviderReady(true) // Start sync provider for _, s := range r.SyncImpl { @@ -92,25 +88,24 @@ func (r *Runtime) Start() error { g.Go(func() error { return r.Service.Serve(gCtx, r.Evaluator, service.ServiceConfiguration{ - ReadinessProbe: func() bool { - return r.ready - }, + ReadinessProbe: r.isReady, }) }) <-gCtx.Done() if err := g.Wait(); err != nil { - r.setProviderReady(false) - // TODO: try to recover from error return err } return nil } -func (r *Runtime) setProviderReady(val bool) { - r.mu.Lock() - r.ready = val - r.mu.Unlock() +func (r *Runtime) isReady() bool { + // if at least a provider can watch for flags changes, we are ready. + pReady := false + for _, p := range r.SyncImpl { + pReady = pReady || p.IsReady() + } + return pReady } // updateWithNotify helps to update state and notify listeners diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index 75db38018..b8f7fbc62 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -23,6 +23,7 @@ type Sync struct { // FileType indicates the file type e.g., json, yaml/yml etc., fileType string watcher *fsnotify.Watcher + ready bool } // default state is used to prevent EOF errors when handling filepath delete events + empty files @@ -42,12 +43,15 @@ func (fs *Sync) Init(ctx context.Context) error { return nil } +func (fs *Sync) IsReady() bool { + return fs.ready +} + //nolint:funlen func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { - defer fs.watcher.Close() fs.sendDataSync(ctx, sync.ALL, dataSync) - + fs.ready = true fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) for { select { @@ -87,6 +91,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { case err, ok := <-fs.watcher.Errors: if !ok { + fs.ready = false return errors.New("watcher error") } diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 81f0f7246..7908f318b 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -186,6 +186,43 @@ func TestFilePathSync_Fetch(t *testing.T) { } } +func TestIsReadySyncFlag(t *testing.T) { + fpSync := Sync{ + URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Logger: logger.NewLogger(nil, false), + } + + setupDir(t, fetchDirName) + createFile(t, fetchDirName, fetchFileName) + writeToFile(t, fetchFileContents) + defer t.Cleanup(cleanupFilePath) + if fpSync.IsReady() != false { + t.Errorf("expected not to be ready") + } + ctx := context.TODO() + err := fpSync.Init(ctx) + if err != nil { + log.Fatalf("Error init sync: %s", err.Error()) + return + } + if fpSync.IsReady() != false { + t.Errorf("expected not to be ready") + } + dataSyncChan := make(chan sync.DataSync, 1) + + go func() { + err = fpSync.Sync(ctx, dataSyncChan) + if err != nil { + log.Fatalf("Error start sync: %s", err.Error()) + return + } + }() + time.Sleep(1 * time.Second) + if fpSync.IsReady() != true { + t.Errorf("expected not to be ready") + } +} + func cleanupFilePath() { if err := os.RemoveAll(fetchDirName); err != nil { log.Fatalf("rmdir: %v", err) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 3d9943841..fe6fe49f5 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -36,6 +36,7 @@ type Sync struct { Logger *logger.Logger client syncv1grpc.FlagSyncService_SyncFlagsClient options []grpc.DialOption + ready bool } func (g *Sync) Init(ctx context.Context) error { @@ -60,21 +61,28 @@ func (g *Sync) Init(ctx context.Context) error { return nil } -func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { +func (g *Sync) IsReady() bool { + return g.ready +} +func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // initial stream listening + g.ready = true err := g.handleFlagSync(g.client, dataSync) + g.ready = false g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) // retry connection establishment for { + g.ready = false syncClient, ok := g.connectWithRetry(ctx, g.options...) if !ok { // We shall exit return nil } - + g.ready = true err = g.handleFlagSync(syncClient, dataSync) if err != nil { + g.ready = false g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) continue } diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index b54397390..dc7394c65 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -42,6 +42,11 @@ func (hs *Sync) Init(ctx context.Context) error { return nil } +func (hs *Sync) IsReady() bool { + // we cannot reliably check external HTTP(s) sources + return true +} + func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // Initial fetch fetch, err := hs.Fetch(ctx) diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 7cbe68bb7..126863a8a 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -45,6 +45,9 @@ type ISync interface { // Sync is the contract between Runtime and sync implementation. // Note that, it is expected to return the first data sync as soon as possible to fill the store. Sync(ctx context.Context, dataSync chan<- DataSync) error + + // IsReady shall return true if the provider is ready to communicate with the Runtime + IsReady() bool } // DataSync is the data contract between Runtime and sync implementations diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index c9244d25b..8767453c5 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -30,6 +30,7 @@ type Sync struct { ProviderArgs sync.ProviderArgs client client.Client URI string + ready bool } func (k *Sync) Init(ctx context.Context) error { @@ -37,6 +38,11 @@ func (k *Sync) Init(ctx context.Context) error { return nil } +func (k *Sync) IsReady() bool { + // we cannot reliably check external HTTP(s) sources + return k.ready +} + func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // Initial fetch fetch, err := k.fetch(ctx) @@ -203,8 +209,8 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) { EventType: DefaultEventTypeReady, }, } - informer.Run(ctx.Done()) + k.ready = true } func createFuncHandler(obj interface{}, object client.ObjectKey, c chan<- INotify) error { From f48bd534a8761ddfc2c8b0b560f7cafb3ef27bf7 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 18 Feb 2023 23:32:39 +0100 Subject: [PATCH 05/15] Add RWLock Signed-off-by: Giovanni Liva --- pkg/runtime/runtime.go | 3 +-- pkg/service/connect_service.go | 13 +++++++------ pkg/service/connect_service_test.go | 2 +- pkg/service/iservice.go | 4 ++-- pkg/sync/file/filepath_sync.go | 20 +++++++++++++++----- pkg/sync/file/filepath_sync_test.go | 22 +++++++++++----------- pkg/sync/grpc/grpc_sync.go | 17 ++++++++++++----- pkg/sync/isync.go | 1 - 8 files changed, 49 insertions(+), 33 deletions(-) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 7fa9d1a45..154b1ed04 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -24,7 +24,6 @@ type Runtime struct { mu msync.Mutex Evaluator eval.IEvaluator Logger *logger.Logger - ready bool } type Config struct { @@ -87,7 +86,7 @@ func (r *Runtime) Start() error { } g.Go(func() error { - return r.Service.Serve(gCtx, r.Evaluator, service.ServiceConfiguration{ + return r.Service.Serve(gCtx, r.Evaluator, service.Configuration{ ReadinessProbe: r.isReady, }) }) diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index 5b8875ca2..ca11fe7dd 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -48,7 +48,7 @@ type eventingConfiguration struct { subs map[interface{}]chan Notification } -func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf ServiceConfiguration) error { +func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error { s.Eval = eval s.eventingConfiguration = &eventingConfiguration{ subs: make(map[interface{}]chan Notification), @@ -87,7 +87,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon } } -func (s *ConnectService) setupServer(svcConf ServiceConfiguration) (net.Listener, error) { +func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error) { var lis net.Listener var err error mux := http.NewServeMux() @@ -113,17 +113,18 @@ func (s *ConnectService) setupServer(svcConf ServiceConfiguration) (net.Listener ReadHeaderTimeout: 3 * time.Second, } server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path == "/healthz" { + switch r.URL.Path { + case "/healthz": w.WriteHeader(http.StatusOK) - } else if r.URL.Path == "/readyz" { + case "/readyz": if svcConf.ReadinessProbe() { w.WriteHeader(http.StatusOK) } else { w.WriteHeader(http.StatusPreconditionFailed) } - } else if r.URL.Path == "/metrics" { + case "/metric": promhttp.Handler().ServeHTTP(w, r) - } else { + default: w.WriteHeader(http.StatusNotFound) } }) diff --git a/pkg/service/connect_service_test.go b/pkg/service/connect_service_test.go index 76f866f11..4616f3319 100644 --- a/pkg/service/connect_service_test.go +++ b/pkg/service/connect_service_test.go @@ -75,7 +75,7 @@ func TestConnectService_UnixConnection(t *testing.T) { }, Logger: logger.NewLogger(nil, false), } - serveConf := service.ServiceConfiguration{ + serveConf := service.Configuration{ ReadinessProbe: func() bool { return true }, diff --git a/pkg/service/iservice.go b/pkg/service/iservice.go index 6f7dc558d..1f1603d2f 100644 --- a/pkg/service/iservice.go +++ b/pkg/service/iservice.go @@ -21,7 +21,7 @@ type Notification struct { type ReadinessProbe func() bool -type ServiceConfiguration struct { +type Configuration struct { ReadinessProbe ReadinessProbe } @@ -29,6 +29,6 @@ type ServiceConfiguration struct { IService implementations define handlers for a particular transport, which call the IEvaluator implementation. */ type IService interface { - Serve(ctx context.Context, eval eval.IEvaluator, svcConf ServiceConfiguration) error + Serve(ctx context.Context, eval eval.IEvaluator, svcConf Configuration) error Notify(n Notification) } diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index b8f7fbc62..b972c13f8 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strings" + lock "sync" "github.com/open-feature/flagd/pkg/sync" @@ -21,9 +22,10 @@ type Sync struct { Logger *logger.Logger ProviderArgs sync.ProviderArgs // FileType indicates the file type e.g., json, yaml/yml etc., - fileType string - watcher *fsnotify.Watcher - ready bool + fileType string + watcher *fsnotify.Watcher + ready bool + readyLock *lock.RWMutex } // default state is used to prevent EOF errors when handling filepath delete events + empty files @@ -44,14 +46,22 @@ func (fs *Sync) Init(ctx context.Context) error { } func (fs *Sync) IsReady() bool { + fs.readyLock.RLock() + defer fs.readyLock.RUnlock() return fs.ready } +func (fs *Sync) setReady(val bool) { + fs.readyLock.Lock() + defer fs.readyLock.Unlock() + fs.ready = val +} + //nolint:funlen func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { defer fs.watcher.Close() fs.sendDataSync(ctx, sync.ALL, dataSync) - fs.ready = true + fs.setReady(true) fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) for { select { @@ -91,7 +101,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { case err, ok := <-fs.watcher.Errors: if !ok { - fs.ready = false + fs.setReady(false) return errors.New("watcher error") } diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 7908f318b..6062dfa4b 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -84,11 +84,6 @@ func TestSimpleSync(t *testing.T) { }, } - handler := Sync{ - URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), - Logger: logger.NewLogger(nil, false), - } - for test, tt := range tests { t.Run(test, func(t *testing.T) { defer t.Cleanup(cleanupFilePath) @@ -96,14 +91,19 @@ func TestSimpleSync(t *testing.T) { createFile(t, fetchDirName, fetchFileName) ctx := context.Background() - err := handler.Init(ctx) - if err != nil { - log.Fatalf("Error init sync: %s", err.Error()) - return - } + dataSyncChan := make(chan sync.DataSync, len(tt.expectedDataSync)) go func() { + handler := Sync{ + URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Logger: logger.NewLogger(nil, false), + } + err := handler.Init(ctx) + if err != nil { + log.Fatalf("Error init sync: %s", err.Error()) + return + } err = handler.Sync(ctx, dataSyncChan) if err != nil { log.Fatalf("Error start sync: %s", err.Error()) @@ -202,7 +202,7 @@ func TestIsReadySyncFlag(t *testing.T) { ctx := context.TODO() err := fpSync.Init(ctx) if err != nil { - log.Fatalf("Error init sync: %s", err.Error()) + log.Printf("Error init sync: %s", err.Error()) return } if fpSync.IsReady() != false { diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index fe6fe49f5..c47bcc184 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "strings" + lock "sync" "time" "google.golang.org/grpc/credentials/insecure" @@ -37,6 +38,7 @@ type Sync struct { client syncv1grpc.FlagSyncService_SyncFlagsClient options []grpc.DialOption ready bool + readyLock lock.RWMutex } func (g *Sync) Init(ctx context.Context) error { @@ -65,24 +67,29 @@ func (g *Sync) IsReady() bool { return g.ready } +func (g *Sync) setReady(val bool) { + g.readyLock.Lock() + defer g.readyLock.Unlock() + g.ready = val +} + func (g *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { // initial stream listening - g.ready = true + g.setReady(true) err := g.handleFlagSync(g.client, dataSync) - g.ready = false g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) // retry connection establishment for { - g.ready = false + g.setReady(false) syncClient, ok := g.connectWithRetry(ctx, g.options...) if !ok { // We shall exit return nil } - g.ready = true + g.setReady(true) err = g.handleFlagSync(syncClient, dataSync) if err != nil { - g.ready = false + g.setReady(false) g.Logger.Warn(fmt.Sprintf("error with stream listener: %s", err.Error())) continue } diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index 126863a8a..7af60ff9d 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -38,7 +38,6 @@ ISync implementations watch for changes in the flag sources (HTTP backend, local value and communicate to the Runtime with DataSync channel */ type ISync interface { - // Init is used by the sync provider to initialize its data structures and external dependencies. Init(ctx context.Context) error From e65d13dbf61d9fb1dbedde27f6b06eefbe80e026 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sat, 18 Feb 2023 23:15:04 +0100 Subject: [PATCH 06/15] polish Signed-off-by: Giovanni Liva --- config/samples/example_flags.flagd.json | 134 ++++++++++++++++++++++++ pkg/sync/file/filepath_sync_test.go | 2 +- pkg/sync/grpc/grpc_sync.go | 2 +- 3 files changed, 136 insertions(+), 2 deletions(-) create mode 100644 config/samples/example_flags.flagd.json diff --git a/config/samples/example_flags.flagd.json b/config/samples/example_flags.flagd.json new file mode 100644 index 000000000..0314b5eda --- /dev/null +++ b/config/samples/example_flags.flagd.json @@ -0,0 +1,134 @@ +{ + "flags": { + "myBoolFlag": { + "state": "ENABLED", + "variants": { + "on": true, + "off": false + }, + "defaultVariant": "on" + }, + "myStringFlag": { + "state": "ENABLED", + "variants": { + "key1": "val1", + "key2": "val2" + }, + "defaultVariant": "key1" + }, + "myFloatFlag": { + "state": "ENABLED", + "variants": { + "one": 1.23, + "two": 2.34 + }, + "defaultVariant": "one" + }, + "myIntFlag": { + "state": "ENABLED", + "variants": { + "one": 1, + "two": 2 + }, + "defaultVariant": "one" + }, + "myObjectFlag": { + "state": "ENABLED", + "variants": { + "object1": { + "key": "val" + }, + "object2": { + "key": true + } + }, + "defaultVariant": "object1" + }, + "isColorYellow": { + "state": "ENABLED", + "variants": { + "on": true, + "off": false + }, + "defaultVariant": "off", + "targeting": { + "if": [ + { + "==": [ + { + "var": [ + "color" + ] + }, + "yellow" + ] + }, + "on", + "off" + ] + } + }, + "fibAlgo": { + "variants": { + "recursive": "recursive", + "memo": "memo", + "loop": "loop", + "binet": "binet" + }, + "defaultVariant": "recursive", + "state": "ENABLED", + "targeting": { + "if": [ + { + "$ref": "emailWithFaas" + }, "binet", null + ] + } + }, + "headerColor": { + "variants": { + "red": "#FF0000", + "blue": "#0000FF", + "green": "#00FF00", + "yellow": "#FFFF00" + }, + "defaultVariant": "red", + "state": "ENABLED", + "targeting": { + "if": [ + { + "$ref": "emailWithFaas" + }, + { + "fractionalEvaluation": [ + "email", + [ + "red", + 25 + ], + [ + "blue", + 25 + ], + [ + "green", + 25 + ], + [ + "yellow", + 25 + ] + ] + }, null + ] + } + } + }, + "$evaluators": { + "emailWithFaas": { + "in": ["@faas.com", { + "var": ["email"] + }] + } + } +} diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 6062dfa4b..6116387f0 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -219,7 +219,7 @@ func TestIsReadySyncFlag(t *testing.T) { }() time.Sleep(1 * time.Second) if fpSync.IsReady() != true { - t.Errorf("expected not to be ready") + t.Errorf("expected to be ready") } } diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index c47bcc184..64ab08c12 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -38,7 +38,7 @@ type Sync struct { client syncv1grpc.FlagSyncService_SyncFlagsClient options []grpc.DialOption ready bool - readyLock lock.RWMutex + readyLock *lock.RWMutex } func (g *Sync) Init(ctx context.Context) error { From 4a93add3e9d38b0ed431fbf042fe65c080fe4964 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Sun, 19 Feb 2023 00:10:19 +0100 Subject: [PATCH 07/15] refactor Signed-off-by: Giovanni Liva --- pkg/runtime/from_config.go | 96 ++++++++++++++++++----------- pkg/sync/file/filepath_sync.go | 21 +++---- pkg/sync/file/filepath_sync_test.go | 3 + pkg/sync/grpc/grpc_sync.go | 10 +-- 4 files changed, 79 insertions(+), 51 deletions(-) diff --git a/pkg/runtime/from_config.go b/pkg/runtime/from_config.go index 0aed576f9..7e313de9c 100644 --- a/pkg/runtime/from_config.go +++ b/pkg/runtime/from_config.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "regexp" + msync "sync" "time" "github.com/open-feature/flagd/pkg/eval" @@ -67,55 +68,28 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { for _, uri := range r.config.SyncURI { switch uriB := []byte(uri); { case regFile.Match(uriB): - r.SyncImpl = append(r.SyncImpl, &file.Sync{ - URI: regFile.ReplaceAllString(uri, ""), - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "filepath"), - ), - ProviderArgs: r.config.ProviderArgs, - }) + r.SyncImpl = append( + r.SyncImpl, + r.newFile(uri, logger), + ) rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", uri)) case regCrd.Match(uriB): r.SyncImpl = append( r.SyncImpl, - &kubernetes.Sync{ - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "kubernetes"), - ), - URI: regCrd.ReplaceAllString(uri, ""), - ProviderArgs: r.config.ProviderArgs, - }, + r.newK8s(uri, logger), ) rtLogger.Debug(fmt.Sprintf("using kubernetes sync-provider for: %s", uri)) case regURL.Match(uriB): r.SyncImpl = append( r.SyncImpl, - &httpSync.Sync{ - URI: uri, - BearerToken: r.config.SyncBearerToken, - Client: &http.Client{ - Timeout: time.Second * 10, - }, - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "remote"), - ), - ProviderArgs: r.config.ProviderArgs, - Cron: cron.New(), - }) + r.newHTTP(uri, logger), + ) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %q", uri)) case regGRPC.Match(uriB): r.SyncImpl = append( r.SyncImpl, - &grpc.Sync{ - Target: grpc.URLToGRPCTarget(uri), - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "grpc"), - ), - }) + r.newGRPC(uri, logger), + ) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ " or 'core.openfeature.dev'", uri) @@ -123,3 +97,53 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { } return nil } + +func (r *Runtime) newGRPC(uri string, logger *logger.Logger) *grpc.Sync { + return &grpc.Sync{ + Target: grpc.URLToGRPCTarget(uri), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "grpc"), + ), + Mux: &msync.RWMutex{}, + } +} + +func (r *Runtime) newHTTP(uri string, logger *logger.Logger) *httpSync.Sync { + return &httpSync.Sync{ + URI: uri, + BearerToken: r.config.SyncBearerToken, + Client: &http.Client{ + Timeout: time.Second * 10, + }, + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "remote"), + ), + ProviderArgs: r.config.ProviderArgs, + Cron: cron.New(), + } +} + +func (r *Runtime) newK8s(uri string, logger *logger.Logger) *kubernetes.Sync { + return &kubernetes.Sync{ + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "kubernetes"), + ), + URI: regCrd.ReplaceAllString(uri, ""), + ProviderArgs: r.config.ProviderArgs, + } +} + +func (r *Runtime) newFile(uri string, logger *logger.Logger) *file.Sync { + return &file.Sync{ + URI: regFile.ReplaceAllString(uri, ""), + Logger: logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "filepath"), + ), + ProviderArgs: r.config.ProviderArgs, + Mux: &msync.RWMutex{}, + } +} diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index b972c13f8..5fc31ebfd 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -7,14 +7,13 @@ import ( "fmt" "os" "strings" - lock "sync" - - "github.com/open-feature/flagd/pkg/sync" + msync "sync" "gopkg.in/yaml.v3" "github.com/fsnotify/fsnotify" "github.com/open-feature/flagd/pkg/logger" + "github.com/open-feature/flagd/pkg/sync" ) type Sync struct { @@ -22,10 +21,10 @@ type Sync struct { Logger *logger.Logger ProviderArgs sync.ProviderArgs // FileType indicates the file type e.g., json, yaml/yml etc., - fileType string - watcher *fsnotify.Watcher - ready bool - readyLock *lock.RWMutex + fileType string + watcher *fsnotify.Watcher + ready bool + Mux *msync.RWMutex } // default state is used to prevent EOF errors when handling filepath delete events + empty files @@ -46,14 +45,14 @@ func (fs *Sync) Init(ctx context.Context) error { } func (fs *Sync) IsReady() bool { - fs.readyLock.RLock() - defer fs.readyLock.RUnlock() + fs.Mux.RLock() + defer fs.Mux.RUnlock() return fs.ready } func (fs *Sync) setReady(val bool) { - fs.readyLock.Lock() - defer fs.readyLock.Unlock() + fs.Mux.Lock() + defer fs.Mux.Unlock() fs.ready = val } diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 6116387f0..330973d56 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "os" + msync "sync" "testing" "time" @@ -98,6 +99,7 @@ func TestSimpleSync(t *testing.T) { handler := Sync{ URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, } err := handler.Init(ctx) if err != nil { @@ -190,6 +192,7 @@ func TestIsReadySyncFlag(t *testing.T) { fpSync := Sync{ URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, } setupDir(t, fetchDirName) diff --git a/pkg/sync/grpc/grpc_sync.go b/pkg/sync/grpc/grpc_sync.go index 64ab08c12..a36c7ac35 100644 --- a/pkg/sync/grpc/grpc_sync.go +++ b/pkg/sync/grpc/grpc_sync.go @@ -5,7 +5,7 @@ import ( "fmt" "math" "strings" - lock "sync" + msync "sync" "time" "google.golang.org/grpc/credentials/insecure" @@ -38,7 +38,7 @@ type Sync struct { client syncv1grpc.FlagSyncService_SyncFlagsClient options []grpc.DialOption ready bool - readyLock *lock.RWMutex + Mux *msync.RWMutex } func (g *Sync) Init(ctx context.Context) error { @@ -64,12 +64,14 @@ func (g *Sync) Init(ctx context.Context) error { } func (g *Sync) IsReady() bool { + g.Mux.RLock() + defer g.Mux.RUnlock() return g.ready } func (g *Sync) setReady(val bool) { - g.readyLock.Lock() - defer g.readyLock.Unlock() + g.Mux.Lock() + defer g.Mux.Unlock() g.ready = val } From 2b2b7cf7dd4e1e2cd2a6139d4c7129c124c5ddbf Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Mon, 20 Feb 2023 18:50:18 +0100 Subject: [PATCH 08/15] Review Signed-off-by: Giovanni Liva --- pkg/runtime/runtime.go | 7 ++++--- pkg/sync/kubernetes/kubernetes_sync.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 154b1ed04..682280a01 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -100,11 +100,12 @@ func (r *Runtime) Start() error { func (r *Runtime) isReady() bool { // if at least a provider can watch for flags changes, we are ready. - pReady := false for _, p := range r.SyncImpl { - pReady = pReady || p.IsReady() + if p.IsReady() { + return true + } } - return pReady + return false } // updateWithNotify helps to update state and notify listeners diff --git a/pkg/sync/kubernetes/kubernetes_sync.go b/pkg/sync/kubernetes/kubernetes_sync.go index 8767453c5..632599d67 100644 --- a/pkg/sync/kubernetes/kubernetes_sync.go +++ b/pkg/sync/kubernetes/kubernetes_sync.go @@ -85,6 +85,7 @@ func (k *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { k.Logger.Debug("configuration deleted") case DefaultEventTypeReady: k.Logger.Debug("notifier ready") + k.ready = true } } } @@ -210,7 +211,6 @@ func (k *Sync) notify(ctx context.Context, c chan<- INotify) { }, } informer.Run(ctx.Done()) - k.ready = true } func createFuncHandler(obj interface{}, object client.ObjectKey, c chan<- INotify) error { From 255948da99a0e2fb5e2c540879c4934bc4981c12 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Mon, 20 Feb 2023 18:50:52 +0100 Subject: [PATCH 09/15] typo Signed-off-by: Giovanni Liva --- pkg/service/connect_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/service/connect_service.go b/pkg/service/connect_service.go index ca11fe7dd..fa92a7e85 100644 --- a/pkg/service/connect_service.go +++ b/pkg/service/connect_service.go @@ -122,7 +122,7 @@ func (s *ConnectService) setupServer(svcConf Configuration) (net.Listener, error } else { w.WriteHeader(http.StatusPreconditionFailed) } - case "/metric": + case "/metrics": promhttp.Handler().ServeHTTP(w, r) default: w.WriteHeader(http.StatusNotFound) From 7f880b9e206405fbdbf03f92d76857e4d9beb0a6 Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Tue, 21 Feb 2023 21:36:50 +0100 Subject: [PATCH 10/15] Change condition of readyness Signed-off-by: Giovanni Liva --- pkg/runtime/runtime.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 682280a01..76c8331a9 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -101,11 +101,11 @@ func (r *Runtime) Start() error { func (r *Runtime) isReady() bool { // if at least a provider can watch for flags changes, we are ready. for _, p := range r.SyncImpl { - if p.IsReady() { - return true + if !p.IsReady() { + return false } } - return false + return true } // updateWithNotify helps to update state and notify listeners From 784c0390157143bfb471fc02d8fae36a3bce5a9d Mon Sep 17 00:00:00 2001 From: Giovanni Liva Date: Thu, 23 Feb 2023 13:32:50 +0100 Subject: [PATCH 11/15] adapt ready for http Signed-off-by: Giovanni Liva --- pkg/sync/http/http_sync.go | 8 ++++++-- pkg/sync/http/http_sync_test.go | 8 ++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/sync/http/http_sync.go b/pkg/sync/http/http_sync.go index dc7394c65..c2a56460f 100644 --- a/pkg/sync/http/http_sync.go +++ b/pkg/sync/http/http_sync.go @@ -23,6 +23,7 @@ type Sync struct { LastBodySHA string Logger *logger.Logger ProviderArgs sync.ProviderArgs + ready bool } // Client defines the behaviour required of a http client @@ -43,8 +44,7 @@ func (hs *Sync) Init(ctx context.Context) error { } func (hs *Sync) IsReady() bool { - // we cannot reliably check external HTTP(s) sources - return true + return hs.ready } func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { @@ -53,6 +53,7 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { if err != nil { return err } + hs.ready = true _ = hs.Cron.AddFunc("*/5 * * * *", func() { body, err := hs.fetchBodyFromURL(ctx, hs.URI) @@ -140,15 +141,18 @@ func (hs *Sync) generateSha(body []byte) string { func (hs *Sync) Fetch(ctx context.Context) (string, error) { if hs.URI == "" { + hs.ready = false return "", errors.New("no HTTP URL string set") } body, err := hs.fetchBodyFromURL(ctx, hs.URI) if err != nil { + hs.ready = false return "", err } if len(body) != 0 { hs.LastBodySHA = hs.generateSha(body) } + hs.ready = true return string(body), nil } diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 9689d74c1..8e6a94f02 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -64,6 +64,7 @@ func TestHTTPSync_Fetch(t *testing.T) { bearerToken string lastBodySHA string handleResponse func(*testing.T, Sync, string, error) + ready bool }{ "success": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -81,6 +82,7 @@ func TestHTTPSync_Fetch(t *testing.T) { t.Errorf("expected fetched to be: '%s', got: '%s'", expected, fetched) } }, + ready: true, }, "return an error if no uri": { setup: func(t *testing.T, client *syncmock.MockClient) {}, @@ -89,6 +91,7 @@ func TestHTTPSync_Fetch(t *testing.T) { t.Error("expected err, got nil") } }, + ready: false, }, "update last body sha": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -110,6 +113,7 @@ func TestHTTPSync_Fetch(t *testing.T) { ) } }, + ready: true, }, "authorization header": { setup: func(t *testing.T, client *syncmock.MockClient) { @@ -131,6 +135,7 @@ func TestHTTPSync_Fetch(t *testing.T) { ) } }, + ready: true, }, } @@ -149,6 +154,9 @@ func TestHTTPSync_Fetch(t *testing.T) { } fetched, err := httpSync.Fetch(context.Background()) + if httpSync.ready != tt.ready { + t.Errorf("expected httpSync.ready to be: '%v', got: '%v'", tt.ready, httpSync.ready) + } tt.handleResponse(t, httpSync, fetched, err) }) } From 4dec80342eea2b8ead1cb18b84170d6a8ce8abf0 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Thu, 23 Feb 2023 13:45:55 -0500 Subject: [PATCH 12/15] fixup: improve cov, test public API Signed-off-by: Todd Baert --- pkg/sync/grpc/grpc_sync_test.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index d521372c8..41fe37c8f 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -6,6 +6,7 @@ import ( "io" "log" "net" + msync "sync" "testing" "buf.build/gen/go/open-feature/flagd/grpc/go/sync/v1/syncv1grpc" @@ -54,12 +55,14 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { Target: "grpc://test", ProviderID: "", Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, } tests := []struct { name string stream syncv1grpc.FlagSyncService_SyncFlagsClient want sync.Type + ready bool }{ { name: "State All maps to Sync All", @@ -69,7 +72,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_ALL, }, }, - want: sync.ALL, + want: sync.ALL, + ready: true, }, { name: "State Add maps to Sync Add", @@ -79,7 +83,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_ADD, }, }, - want: sync.ADD, + want: sync.ADD, + ready: true, }, { name: "State Update maps to Sync Update", @@ -89,7 +94,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_UPDATE, }, }, - want: sync.UPDATE, + want: sync.UPDATE, + ready: true, }, { name: "State Delete maps to Sync Delete", @@ -99,7 +105,8 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { State: v1.SyncState_SYNC_STATE_DELETE, }, }, - want: sync.DELETE, + want: sync.DELETE, + ready: true, }, } @@ -108,13 +115,18 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { syncChan := make(chan sync.DataSync) go func() { - err := grpcSyncImpl.handleFlagSync(test.stream, syncChan) + grpcSyncImpl.client = test.stream + err := grpcSyncImpl.Sync(context.TODO(), syncChan) if err != nil { t.Errorf("Error handling flag sync: %s", err.Error()) } }() data := <-syncChan + if grpcSyncImpl.ready != test.ready { + t.Errorf("expected grpcSyncImpl.ready to be: '%v', got: '%v'", test.ready, grpcSyncImpl.ready) + } + if data.Type != test.want { t.Errorf("Returned data sync state = %v, wanted %v", data.Type, test.want) } @@ -231,6 +243,7 @@ func Test_StreamListener(t *testing.T) { Target: target, ProviderID: "", Logger: logger.NewLogger(nil, false), + Mux: &msync.RWMutex{}, } // initialize client @@ -253,7 +266,8 @@ func Test_StreamListener(t *testing.T) { // listen to stream go func() { - err := grpcSync.handleFlagSync(syncClient, syncChan) + grpcSync.client = syncClient + err := grpcSync.Sync(context.TODO(), syncChan) if err != nil { // must ignore EOF as this is returned for stream end if err != io.EOF { From 136eaf02a020e715aee30f8d3d1157f247e18986 Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Thu, 23 Feb 2023 13:59:40 -0500 Subject: [PATCH 13/15] fixup: use isready func Signed-off-by: Todd Baert --- pkg/sync/grpc/grpc_sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/grpc/grpc_sync_test.go b/pkg/sync/grpc/grpc_sync_test.go index 41fe37c8f..b326a2340 100644 --- a/pkg/sync/grpc/grpc_sync_test.go +++ b/pkg/sync/grpc/grpc_sync_test.go @@ -123,7 +123,7 @@ func TestSync_BasicFlagSyncStates(t *testing.T) { }() data := <-syncChan - if grpcSyncImpl.ready != test.ready { + if grpcSyncImpl.IsReady() != test.ready { t.Errorf("expected grpcSyncImpl.ready to be: '%v', got: '%v'", test.ready, grpcSyncImpl.ready) } From bab7938cf244d391f58a43b251a917350324a39d Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Thu, 23 Feb 2023 14:24:04 -0500 Subject: [PATCH 14/15] fixup: use isready func in http Signed-off-by: Todd Baert --- pkg/sync/http/http_sync_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/sync/http/http_sync_test.go b/pkg/sync/http/http_sync_test.go index 8e6a94f02..e7e1596c4 100644 --- a/pkg/sync/http/http_sync_test.go +++ b/pkg/sync/http/http_sync_test.go @@ -154,7 +154,7 @@ func TestHTTPSync_Fetch(t *testing.T) { } fetched, err := httpSync.Fetch(context.Background()) - if httpSync.ready != tt.ready { + if httpSync.IsReady() != tt.ready { t.Errorf("expected httpSync.ready to be: '%v', got: '%v'", tt.ready, httpSync.ready) } tt.handleResponse(t, httpSync, fetched, err) From 35451fa3aef5503e081498dda750f6dfb7bb33ce Mon Sep 17 00:00:00 2001 From: Todd Baert Date: Mon, 27 Feb 2023 12:24:37 -0500 Subject: [PATCH 15/15] Update pkg/runtime/runtime.go Signed-off-by: Todd Baert --- pkg/runtime/runtime.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/runtime/runtime.go b/pkg/runtime/runtime.go index 76c8331a9..4772eaef3 100644 --- a/pkg/runtime/runtime.go +++ b/pkg/runtime/runtime.go @@ -99,7 +99,7 @@ func (r *Runtime) Start() error { } func (r *Runtime) isReady() bool { - // if at least a provider can watch for flags changes, we are ready. + // if all providers can watch for flag changes, we are ready. for _, p := range r.SyncImpl { if !p.IsReady() { return false