From f85dad7d81f0a4816217e87aedf0db8bf46f9646 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 23 Mar 2023 12:10:50 +0000 Subject: [PATCH 1/8] wip Signed-off-by: James Milligan --- core/pkg/runtime/from_config.go | 16 ++++----- core/pkg/service/sync/server.go | 4 +++ core/pkg/service/sync/sync_metrics.go | 36 +++++++++++++++++++ core/pkg/sync-store/sync_store.go | 50 ++++++++++++++------------- todo | 1 + 5 files changed, 75 insertions(+), 32 deletions(-) create mode 100644 core/pkg/service/sync/sync_metrics.go create mode 100644 todo diff --git a/core/pkg/runtime/from_config.go b/core/pkg/runtime/from_config.go index cccf8df3d..6472fa813 100644 --- a/core/pkg/runtime/from_config.go +++ b/core/pkg/runtime/from_config.go @@ -96,11 +96,11 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { case syncProviderFile: r.SyncImpl = append( r.SyncImpl, - r.newFile(syncProvider, logger), + NewFile(syncProvider, logger), ) rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI)) case syncProviderKubernetes: - k, err := r.newK8s(syncProvider.URI, logger) + k, err := NewK8s(syncProvider.URI, logger) if err != nil { return err } @@ -112,13 +112,13 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { case syncProviderHTTP: r.SyncImpl = append( r.SyncImpl, - r.newHTTP(syncProvider, logger), + NewHTTP(syncProvider, logger), ) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI)) case syncProviderGrpc: r.SyncImpl = append( r.SyncImpl, - r.newGRPC(syncProvider, logger), + NewGRPC(syncProvider, logger), ) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ @@ -128,7 +128,7 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { return nil } -func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync { +func NewGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync { return &grpc.Sync{ URI: config.URI, Logger: logger.WithFields( @@ -141,7 +141,7 @@ func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc } } -func (r *Runtime) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync { +func NewHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync { return &httpSync.Sync{ URI: config.URI, Client: &http.Client{ @@ -156,7 +156,7 @@ func (r *Runtime) newHTTP(config sync.SourceConfig, logger *logger.Logger) *http } } -func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { +func NewK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { reader, dynamic, err := kubernetes.GetClients() if err != nil { return nil, err @@ -172,7 +172,7 @@ func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, e ), nil } -func (r *Runtime) newFile(config sync.SourceConfig, logger *logger.Logger) *file.Sync { +func NewFile(config sync.SourceConfig, logger *logger.Logger) *file.Sync { return &file.Sync{ URI: config.URI, Logger: logger.WithFields( diff --git a/core/pkg/service/sync/server.go b/core/pkg/service/sync/server.go index a7d7de993..dd0a12503 100644 --- a/core/pkg/service/sync/server.go +++ b/core/pkg/service/sync/server.go @@ -12,6 +12,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" iservice "github.com/open-feature/flagd/core/pkg/service" syncStore "github.com/open-feature/flagd/core/pkg/sync-store" + "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sync/errgroup" @@ -61,6 +62,7 @@ func (s *Server) Serve(ctx context.Context, svcConf iservice.Configuration) erro } return nil }) + go s.captureMetrics(gCtx) err := g.Wait() if err != nil { @@ -111,6 +113,8 @@ func (s *Server) startMetricsServer() error { } else { w.WriteHeader(http.StatusPreconditionFailed) } + case "/metrics": + promhttp.Handler().ServeHTTP(w, r) default: w.WriteHeader(http.StatusNotFound) } diff --git a/core/pkg/service/sync/sync_metrics.go b/core/pkg/service/sync/sync_metrics.go new file mode 100644 index 000000000..dd99ebf25 --- /dev/null +++ b/core/pkg/service/sync/sync_metrics.go @@ -0,0 +1,36 @@ +package sync + +import ( + "context" + "fmt" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +func init() { + prometheus.Register(totalRequests) +} + +var totalRequests = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "sync_subscriptions_gauge", + Help: "Number of open sync subscriptions.", + }, +) + +func (s *Server) captureMetrics(ctx context.Context) { + go func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + fmt.Println("fetching metrics") + syncs := s.handler.syncStore.GetSyncMetrics() + totalRequests.Set(syncs) + } + } + }() +} diff --git a/core/pkg/sync-store/sync_store.go b/core/pkg/sync-store/sync_store.go index f0d6b5697..757ca6095 100644 --- a/core/pkg/sync-store/sync_store.go +++ b/core/pkg/sync-store/sync_store.go @@ -9,9 +9,8 @@ import ( "time" "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/runtime" isync "github.com/open-feature/flagd/core/pkg/sync" - "github.com/open-feature/flagd/core/pkg/sync/file" - "github.com/open-feature/flagd/core/pkg/sync/kubernetes" "go.uber.org/zap" ) @@ -254,6 +253,18 @@ func (s *SyncStore) cleanup() { } } +func (s *SyncStore) GetSyncMetrics() float64 { + s.mu.RLock() + defer s.mu.RUnlock() + + syncs := 0 + for _, v := range s.syncHandlers { + syncs += len(v.subs) + } + + return float64(syncs) +} + type SyncBuilderInterface interface { SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error) } @@ -265,29 +276,20 @@ func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (isync.ISy switch uriB := []byte(uri); { // filepath may be used for debugging, not recommended in deployment case regFile.Match(uriB): - return &file.Sync{ - URI: regFile.ReplaceAllString(uri, ""), - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "filepath"), - zap.String("target", "target"), - ), - Mux: &sync.RWMutex{}, - }, nil + return runtime.NewFile(isync.SourceConfig{ + URI: regFile.ReplaceAllString(uri, ""), + Provider: "file", + }, logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "filepath"), + zap.String("target", "target"), + )), nil case regCrd.Match(uriB): - reader, dynamic, err := kubernetes.GetClients() - if err != nil { - return nil, err - } - return kubernetes.NewK8sSync( - logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "kubernetes"), - ), - regCrd.ReplaceAllString(uri, ""), - reader, - dynamic, - ), nil + return runtime.NewK8s(regCrd.ReplaceAllString(uri, ""), logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "filepath"), + zap.String("target", "target"), + )) } return nil, fmt.Errorf("unrecognized URI: %s", uri) } diff --git a/todo b/todo new file mode 100644 index 000000000..bed8867a9 --- /dev/null +++ b/todo @@ -0,0 +1 @@ +make the constructor changes so they can be imported into sync store \ No newline at end of file From 51b1ac2e4548e8f6945c71a4ab3f8955d9a086dd Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 23 Mar 2023 12:18:08 +0000 Subject: [PATCH 2/8] nil pointer fix + export constructors Signed-off-by: James Milligan --- core/pkg/runtime/from_config.go | 24 ++++++++++++---------- core/pkg/sync-store/sync_store.go | 34 ++++++++++--------------------- 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/core/pkg/runtime/from_config.go b/core/pkg/runtime/from_config.go index cccf8df3d..ace7c9af2 100644 --- a/core/pkg/runtime/from_config.go +++ b/core/pkg/runtime/from_config.go @@ -17,6 +17,7 @@ import ( "github.com/open-feature/flagd/core/pkg/sync" "github.com/open-feature/flagd/core/pkg/sync/file" "github.com/open-feature/flagd/core/pkg/sync/grpc" + "github.com/open-feature/flagd/core/pkg/sync/grpc/credentials" httpSync "github.com/open-feature/flagd/core/pkg/sync/http" "github.com/open-feature/flagd/core/pkg/sync/kubernetes" "github.com/robfig/cron" @@ -96,11 +97,11 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { case syncProviderFile: r.SyncImpl = append( r.SyncImpl, - r.newFile(syncProvider, logger), + NewFile(syncProvider, logger), ) rtLogger.Debug(fmt.Sprintf("using filepath sync-provider for: %q", syncProvider.URI)) case syncProviderKubernetes: - k, err := r.newK8s(syncProvider.URI, logger) + k, err := NewK8s(syncProvider.URI, logger) if err != nil { return err } @@ -112,13 +113,13 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { case syncProviderHTTP: r.SyncImpl = append( r.SyncImpl, - r.newHTTP(syncProvider, logger), + NewHTTP(syncProvider, logger), ) rtLogger.Debug(fmt.Sprintf("using remote sync-provider for: %s", syncProvider.URI)) case syncProviderGrpc: r.SyncImpl = append( r.SyncImpl, - r.newGRPC(syncProvider, logger), + NewGRPC(syncProvider, logger), ) default: return fmt.Errorf("invalid sync uri argument: %s, must start with 'file:', 'http(s)://', 'grpc://',"+ @@ -128,20 +129,21 @@ func (r *Runtime) setSyncImplFromConfig(logger *logger.Logger) error { return nil } -func (r *Runtime) newGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync { +func NewGRPC(config sync.SourceConfig, logger *logger.Logger) *grpc.Sync { return &grpc.Sync{ URI: config.URI, Logger: logger.WithFields( zap.String("component", "sync"), zap.String("sync", "grpc"), ), - CertPath: config.CertPath, - ProviderID: config.ProviderID, - Selector: config.Selector, + CertPath: config.CertPath, + ProviderID: config.ProviderID, + Selector: config.Selector, + CredentialBuilder: &credentials.CredentialBuilder{}, } } -func (r *Runtime) newHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync { +func NewHTTP(config sync.SourceConfig, logger *logger.Logger) *httpSync.Sync { return &httpSync.Sync{ URI: config.URI, Client: &http.Client{ @@ -156,7 +158,7 @@ func (r *Runtime) newHTTP(config sync.SourceConfig, logger *logger.Logger) *http } } -func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { +func NewK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, error) { reader, dynamic, err := kubernetes.GetClients() if err != nil { return nil, err @@ -172,7 +174,7 @@ func (r *Runtime) newK8s(uri string, logger *logger.Logger) (*kubernetes.Sync, e ), nil } -func (r *Runtime) newFile(config sync.SourceConfig, logger *logger.Logger) *file.Sync { +func NewFile(config sync.SourceConfig, logger *logger.Logger) *file.Sync { return &file.Sync{ URI: config.URI, Logger: logger.WithFields( diff --git a/core/pkg/sync-store/sync_store.go b/core/pkg/sync-store/sync_store.go index f0d6b5697..165701bcb 100644 --- a/core/pkg/sync-store/sync_store.go +++ b/core/pkg/sync-store/sync_store.go @@ -9,9 +9,8 @@ import ( "time" "github.com/open-feature/flagd/core/pkg/logger" + "github.com/open-feature/flagd/core/pkg/runtime" isync "github.com/open-feature/flagd/core/pkg/sync" - "github.com/open-feature/flagd/core/pkg/sync/file" - "github.com/open-feature/flagd/core/pkg/sync/kubernetes" "go.uber.org/zap" ) @@ -265,29 +264,18 @@ func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (isync.ISy switch uriB := []byte(uri); { // filepath may be used for debugging, not recommended in deployment case regFile.Match(uriB): - return &file.Sync{ + return runtime.NewFile(isync.SourceConfig{ URI: regFile.ReplaceAllString(uri, ""), - Logger: logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "filepath"), - zap.String("target", "target"), - ), - Mux: &sync.RWMutex{}, - }, nil + }, logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "filepath"), + zap.String("target", "target"), + )), nil case regCrd.Match(uriB): - reader, dynamic, err := kubernetes.GetClients() - if err != nil { - return nil, err - } - return kubernetes.NewK8sSync( - logger.WithFields( - zap.String("component", "sync"), - zap.String("sync", "kubernetes"), - ), - regCrd.ReplaceAllString(uri, ""), - reader, - dynamic, - ), nil + return runtime.NewK8s(uri, logger.WithFields( + zap.String("component", "sync"), + zap.String("sync", "kubernetes"), + )) } return nil, fmt.Errorf("unrecognized URI: %s", uri) } From 9f07e3326a325522f1a11edfa2d0cb7764b442b3 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Thu, 23 Mar 2023 16:15:28 +0000 Subject: [PATCH 3/8] cleanup Signed-off-by: James Milligan --- core/pkg/service/sync/sync_metrics.go | 7 +++---- todo | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) delete mode 100644 todo diff --git a/core/pkg/service/sync/sync_metrics.go b/core/pkg/service/sync/sync_metrics.go index dd99ebf25..fb1e78b88 100644 --- a/core/pkg/service/sync/sync_metrics.go +++ b/core/pkg/service/sync/sync_metrics.go @@ -2,7 +2,6 @@ package sync import ( "context" - "fmt" "time" "github.com/prometheus/client_golang/prometheus" @@ -15,8 +14,9 @@ func init() { var totalRequests = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "sync_subscriptions_gauge", - Help: "Number of open sync subscriptions.", + Namespace: "sync", + Name: "active_streams", + Help: "Number of open sync subscriptions.", }, ) @@ -27,7 +27,6 @@ func (s *Server) captureMetrics(ctx context.Context) { case <-ctx.Done(): return case <-time.After(5 * time.Second): - fmt.Println("fetching metrics") syncs := s.handler.syncStore.GetSyncMetrics() totalRequests.Set(syncs) } diff --git a/todo b/todo deleted file mode 100644 index bed8867a9..000000000 --- a/todo +++ /dev/null @@ -1 +0,0 @@ -make the constructor changes so they can be imported into sync store \ No newline at end of file From 3faddd71fb4f37f9e95978bb7ea8e0f76f690146 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Fri, 24 Mar 2023 13:18:10 +0000 Subject: [PATCH 4/8] fix conflicts Signed-off-by: James Milligan --- core/pkg/sync-store/sync_store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/pkg/sync-store/sync_store.go b/core/pkg/sync-store/sync_store.go index 45a9cded6..eac416bb4 100644 --- a/core/pkg/sync-store/sync_store.go +++ b/core/pkg/sync-store/sync_store.go @@ -264,7 +264,7 @@ func (sb *SyncBuilder) SyncFromURI(uri string, logger *logger.Logger) (isync.ISy switch uriB := []byte(uri); { // filepath may be used for debugging, not recommended in deployment case regFile.Match(uriB): - return runtime.NewFile(isync.SourceConfig{ + return runtime.NewFile(runtime.SourceConfig{ URI: regFile.ReplaceAllString(uri, ""), }, logger.WithFields( zap.String("component", "sync"), From 39da2c7ff66c7a2dc893de80d06a407e35de3f50 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Mon, 27 Mar 2023 09:52:58 +0100 Subject: [PATCH 5/8] switch to otel metric collection with prometheus exporter Signed-off-by: James Milligan --- core/pkg/service/sync/server.go | 4 +- core/pkg/service/sync/sync_metrics.go | 56 +++++++++++++++------------ core/pkg/sync-store/sync_store.go | 4 +- 3 files changed, 36 insertions(+), 28 deletions(-) diff --git a/core/pkg/service/sync/server.go b/core/pkg/service/sync/server.go index dd0a12503..9f6fac572 100644 --- a/core/pkg/service/sync/server.go +++ b/core/pkg/service/sync/server.go @@ -62,7 +62,9 @@ func (s *Server) Serve(ctx context.Context, svcConf iservice.Configuration) erro } return nil }) - go s.captureMetrics(gCtx) + g.Go(func() error { + return s.captureMetrics(ctx) + }) err := g.Wait() if err != nil { diff --git a/core/pkg/service/sync/sync_metrics.go b/core/pkg/service/sync/sync_metrics.go index fb1e78b88..7665b205a 100644 --- a/core/pkg/service/sync/sync_metrics.go +++ b/core/pkg/service/sync/sync_metrics.go @@ -2,34 +2,40 @@ package sync import ( "context" - "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/exporters/prometheus" + api "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/metric" ) -func init() { - prometheus.Register(totalRequests) -} - -var totalRequests = promauto.NewGauge( - prometheus.GaugeOpts{ - Namespace: "sync", - Name: "active_streams", - Help: "Number of open sync subscriptions.", - }, +const ( + serviceName = "openfeature/kube-flagd-proxy" ) -func (s *Server) captureMetrics(ctx context.Context) { - go func() { - for { - select { - case <-ctx.Done(): - return - case <-time.After(5 * time.Second): - syncs := s.handler.syncStore.GetSyncMetrics() - totalRequests.Set(syncs) - } - } - }() +func (s *Server) captureMetrics(ctx context.Context) error { + exporter, err := prometheus.New() + if err != nil { + return err + } + provider := metric.NewMeterProvider(metric.WithReader(exporter)) + meter := provider.Meter(serviceName) + + syncGuage, err := meter.Int64ObservableGauge( + "sync_active_streams", + instrument.WithDescription("number of open sync subscriptions."), + ) + if err != nil { + return err + } + + _, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error { + o.ObserveInt64(syncGuage, s.handler.syncStore.GetSyncMetrics()) + return nil + }, syncGuage) + if err != nil { + return err + } + <-ctx.Done() + return nil } diff --git a/core/pkg/sync-store/sync_store.go b/core/pkg/sync-store/sync_store.go index c35259cce..3c57c96aa 100644 --- a/core/pkg/sync-store/sync_store.go +++ b/core/pkg/sync-store/sync_store.go @@ -253,7 +253,7 @@ func (s *SyncStore) cleanup() { } } -func (s *SyncStore) GetSyncMetrics() float64 { +func (s *SyncStore) GetSyncMetrics() int64 { s.mu.RLock() defer s.mu.RUnlock() @@ -262,7 +262,7 @@ func (s *SyncStore) GetSyncMetrics() float64 { syncs += len(v.subs) } - return float64(syncs) + return int64(syncs) } type SyncBuilderInterface interface { From d0691e6c633a1e3ddd34eb3b1bd549a3acf2166a Mon Sep 17 00:00:00 2001 From: James Milligan Date: Mon, 27 Mar 2023 10:06:50 +0100 Subject: [PATCH 6/8] cleanup Signed-off-by: James Milligan --- core/pkg/service/sync/sync_metrics.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/pkg/service/sync/sync_metrics.go b/core/pkg/service/sync/sync_metrics.go index 7665b205a..758b3c3ec 100644 --- a/core/pkg/service/sync/sync_metrics.go +++ b/core/pkg/service/sync/sync_metrics.go @@ -23,7 +23,7 @@ func (s *Server) captureMetrics(ctx context.Context) error { syncGuage, err := meter.Int64ObservableGauge( "sync_active_streams", - instrument.WithDescription("number of open sync subscriptions."), + instrument.WithDescription("number of open sync subscriptions"), ) if err != nil { return err @@ -36,6 +36,7 @@ func (s *Server) captureMetrics(ctx context.Context) error { if err != nil { return err } + <-ctx.Done() return nil } From c8a1287f829188295d012d1b0197861b7aae0533 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Mon, 27 Mar 2023 10:15:01 +0100 Subject: [PATCH 7/8] test coverage Signed-off-by: James Milligan --- core/pkg/service/sync/sync_metrics.go | 2 +- core/pkg/sync-store/sync_store.go | 2 +- core/pkg/sync-store/sync_store_test.go | 25 +++++++++++++++++++++++++ 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/core/pkg/service/sync/sync_metrics.go b/core/pkg/service/sync/sync_metrics.go index 758b3c3ec..d196b205d 100644 --- a/core/pkg/service/sync/sync_metrics.go +++ b/core/pkg/service/sync/sync_metrics.go @@ -30,7 +30,7 @@ func (s *Server) captureMetrics(ctx context.Context) error { } _, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error { - o.ObserveInt64(syncGuage, s.handler.syncStore.GetSyncMetrics()) + o.ObserveInt64(syncGuage, s.handler.syncStore.GetActiveSubscriptionsInt64()) return nil }, syncGuage) if err != nil { diff --git a/core/pkg/sync-store/sync_store.go b/core/pkg/sync-store/sync_store.go index 3c57c96aa..0f641de04 100644 --- a/core/pkg/sync-store/sync_store.go +++ b/core/pkg/sync-store/sync_store.go @@ -253,7 +253,7 @@ func (s *SyncStore) cleanup() { } } -func (s *SyncStore) GetSyncMetrics() int64 { +func (s *SyncStore) GetActiveSubscriptionsInt64() int64 { s.mu.RLock() defer s.mu.RUnlock() diff --git a/core/pkg/sync-store/sync_store_test.go b/core/pkg/sync-store/sync_store_test.go index 347eebc27..7b424b3a4 100644 --- a/core/pkg/sync-store/sync_store_test.go +++ b/core/pkg/sync-store/sync_store_test.go @@ -441,3 +441,28 @@ func Test_registerSubscriptionResyncPath(t *testing.T) { }) } } + +func Test_syncMetrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncMock := newMockSync() + syncStore.syncBuilder = &syncBuilderMock{ + mock: syncMock, + } + + subs := syncStore.GetActiveSubscriptionsInt64() + if subs != 0 { + t.Error("there are no subscriptions registered, active subs should be 0") + } + + target := "test-target" + syncHandler, _ := newSyncHandler() + + syncStore.syncHandlers[target] = syncHandler + + subs = syncStore.GetActiveSubscriptionsInt64() + if subs != 1 { + t.Error("active subs metric should equal 1") + } +} From 7e1e4804c80f42665d2d28b4f304e2a54dc45fe8 Mon Sep 17 00:00:00 2001 From: James Milligan Date: Mon, 27 Mar 2023 10:30:52 +0100 Subject: [PATCH 8/8] remove unused context Signed-off-by: James Milligan --- core/pkg/service/sync/server.go | 4 +--- core/pkg/service/sync/sync_metrics.go | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/pkg/service/sync/server.go b/core/pkg/service/sync/server.go index 9f6fac572..36fe52218 100644 --- a/core/pkg/service/sync/server.go +++ b/core/pkg/service/sync/server.go @@ -62,9 +62,7 @@ func (s *Server) Serve(ctx context.Context, svcConf iservice.Configuration) erro } return nil }) - g.Go(func() error { - return s.captureMetrics(ctx) - }) + g.Go(s.captureMetrics) err := g.Wait() if err != nil { diff --git a/core/pkg/service/sync/sync_metrics.go b/core/pkg/service/sync/sync_metrics.go index d196b205d..be8f42325 100644 --- a/core/pkg/service/sync/sync_metrics.go +++ b/core/pkg/service/sync/sync_metrics.go @@ -13,7 +13,7 @@ const ( serviceName = "openfeature/kube-flagd-proxy" ) -func (s *Server) captureMetrics(ctx context.Context) error { +func (s *Server) captureMetrics() error { exporter, err := prometheus.New() if err != nil { return err @@ -37,6 +37,5 @@ func (s *Server) captureMetrics(ctx context.Context) error { return err } - <-ctx.Done() return nil }