diff --git a/core/pkg/service/sync/server.go b/core/pkg/service/sync/server.go index a7d7de993..36fe52218 100644 --- a/core/pkg/service/sync/server.go +++ b/core/pkg/service/sync/server.go @@ -12,6 +12,7 @@ import ( "github.com/open-feature/flagd/core/pkg/logger" iservice "github.com/open-feature/flagd/core/pkg/service" syncStore "github.com/open-feature/flagd/core/pkg/sync-store" + "github.com/prometheus/client_golang/prometheus/promhttp" "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "golang.org/x/sync/errgroup" @@ -61,6 +62,7 @@ func (s *Server) Serve(ctx context.Context, svcConf iservice.Configuration) erro } return nil }) + g.Go(s.captureMetrics) err := g.Wait() if err != nil { @@ -111,6 +113,8 @@ func (s *Server) startMetricsServer() error { } else { w.WriteHeader(http.StatusPreconditionFailed) } + case "/metrics": + promhttp.Handler().ServeHTTP(w, r) default: w.WriteHeader(http.StatusNotFound) } diff --git a/core/pkg/service/sync/sync_metrics.go b/core/pkg/service/sync/sync_metrics.go new file mode 100644 index 000000000..be8f42325 --- /dev/null +++ b/core/pkg/service/sync/sync_metrics.go @@ -0,0 +1,41 @@ +package sync + +import ( + "context" + + "go.opentelemetry.io/otel/exporters/prometheus" + api "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/sdk/metric" +) + +const ( + serviceName = "openfeature/kube-flagd-proxy" +) + +func (s *Server) captureMetrics() error { + exporter, err := prometheus.New() + if err != nil { + return err + } + provider := metric.NewMeterProvider(metric.WithReader(exporter)) + meter := provider.Meter(serviceName) + + syncGuage, err := meter.Int64ObservableGauge( + "sync_active_streams", + instrument.WithDescription("number of open sync subscriptions"), + ) + if err != nil { + return err + } + + _, err = meter.RegisterCallback(func(_ context.Context, o api.Observer) error { + o.ObserveInt64(syncGuage, s.handler.syncStore.GetActiveSubscriptionsInt64()) + return nil + }, syncGuage) + if err != nil { + return err + } + + return nil +} diff --git a/core/pkg/sync-store/sync_store.go b/core/pkg/sync-store/sync_store.go index eac416bb4..0f641de04 100644 --- a/core/pkg/sync-store/sync_store.go +++ b/core/pkg/sync-store/sync_store.go @@ -253,6 +253,18 @@ func (s *SyncStore) cleanup() { } } +func (s *SyncStore) GetActiveSubscriptionsInt64() int64 { + s.mu.RLock() + defer s.mu.RUnlock() + + syncs := 0 + for _, v := range s.syncHandlers { + syncs += len(v.subs) + } + + return int64(syncs) +} + type SyncBuilderInterface interface { SyncFromURI(uri string, logger *logger.Logger) (isync.ISync, error) } diff --git a/core/pkg/sync-store/sync_store_test.go b/core/pkg/sync-store/sync_store_test.go index 347eebc27..7b424b3a4 100644 --- a/core/pkg/sync-store/sync_store_test.go +++ b/core/pkg/sync-store/sync_store_test.go @@ -441,3 +441,28 @@ func Test_registerSubscriptionResyncPath(t *testing.T) { }) } } + +func Test_syncMetrics(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + syncStore := NewSyncStore(ctx, logger.NewLogger(nil, false)) + syncMock := newMockSync() + syncStore.syncBuilder = &syncBuilderMock{ + mock: syncMock, + } + + subs := syncStore.GetActiveSubscriptionsInt64() + if subs != 0 { + t.Error("there are no subscriptions registered, active subs should be 0") + } + + target := "test-target" + syncHandler, _ := newSyncHandler() + + syncStore.syncHandlers[target] = syncHandler + + subs = syncStore.GetActiveSubscriptionsInt64() + if subs != 1 { + t.Error("active subs metric should equal 1") + } +}