From 89f641e794a64ec3fee5f3d140a25e723634b580 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 27 Mar 2023 09:50:08 +0200 Subject: [PATCH 1/5] chore: move startServer functions into errGroups Signed-off-by: Florian Bacher --- .../flag-evaluation/connect_service.go | 96 ++++++++++++------- 1 file changed, 60 insertions(+), 36 deletions(-) diff --git a/core/pkg/service/flag-evaluation/connect_service.go b/core/pkg/service/flag-evaluation/connect_service.go index 3bdd642bb..86019c5e2 100644 --- a/core/pkg/service/flag-evaluation/connect_service.go +++ b/core/pkg/service/flag-evaluation/connect_service.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "golang.org/x/sync/errgroup" "net" "net/http" "sync" @@ -31,7 +32,8 @@ type ConnectService struct { Eval eval.IEvaluator Metrics *otel.MetricsRecorder eventingConfiguration *eventingConfiguration - server http.Server + server *http.Server + metricsServer *http.Server } func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf service.Configuration) error { @@ -40,40 +42,38 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon subs: make(map[interface{}]chan service.Notification), mu: &sync.RWMutex{}, } - lis, err := s.setupServer(svcConf) - if err != nil { - return err - } - errChan := make(chan error, 1) - go func() { - s.Logger.Info(fmt.Sprintf("Flag Evaluation listening at %s", lis.Addr())) - if svcConf.CertPath != "" && svcConf.KeyPath != "" { - if err := s.server.ServeTLS( - lis, - svcConf.CertPath, - svcConf.KeyPath, - ); err != nil && !errors.Is(err, http.ErrServerClosed) { - errChan <- err + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + return s.startServer(svcConf) + }) + g.Go(func() error { + return s.startMetricsServer(svcConf) + }) + g.Go(func() error { + <-gCtx.Done() + if s.server != nil { + if err := s.server.Shutdown(gCtx); err != nil { + return err } - } else { - if err := s.server.Serve( - lis, - ); err != nil && !errors.Is(err, http.ErrServerClosed) { - errChan <- err + } + return nil + }) + g.Go(func() error { + <-gCtx.Done() + if s.metricsServer != nil { + if err := s.metricsServer.Shutdown(gCtx); err != nil { + return err } } - close(errChan) - }() - - go s.startMetricsServer(svcConf) - - select { - case err := <-errChan: + return nil + }) + err := g.Wait() + if err != nil { return err - case <-ctx.Done(): - return s.server.Shutdown(ctx) } + return nil } func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listener, error) { @@ -97,7 +97,7 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene path, handler := schemaConnectV1.NewServiceHandler(fes) mux.Handle(path, handler) - s.server = http.Server{ + s.server = &http.Server{ ReadHeaderTimeout: time.Second, Handler: handler, } @@ -136,13 +136,37 @@ func (s *ConnectService) Notify(n service.Notification) { } } -func (s *ConnectService) startMetricsServer(svcConf service.Configuration) { +func (s *ConnectService) startServer(svcConf service.Configuration) error { + lis, err := s.setupServer(svcConf) + if err != nil { + return err + } + s.Logger.Info(fmt.Sprintf("Flag Evaluation listening at %s", lis.Addr())) + if svcConf.CertPath != "" && svcConf.KeyPath != "" { + if err := s.server.ServeTLS( + lis, + svcConf.CertPath, + svcConf.KeyPath, + ); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + } else { + if err := s.server.Serve( + lis, + ); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + } + return nil +} + +func (s *ConnectService) startMetricsServer(svcConf service.Configuration) error { s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", svcConf.MetricsPort)) - server := &http.Server{ + s.metricsServer = &http.Server{ Addr: fmt.Sprintf(":%d", svcConf.MetricsPort), ReadHeaderTimeout: 3 * time.Second, } - server.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.metricsServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/healthz": w.WriteHeader(http.StatusOK) @@ -158,8 +182,8 @@ func (s *ConnectService) startMetricsServer(svcConf service.Configuration) { w.WriteHeader(http.StatusNotFound) } }) - err := server.ListenAndServe() - if err != nil { - panic(err) + if err := s.metricsServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err } + return nil } From 895b5ecd2faa61c2eb8d46276a3a5785a4b6d012 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 27 Mar 2023 11:05:25 +0200 Subject: [PATCH 2/5] added mutexes to avoid data race Signed-off-by: Florian Bacher --- core/pkg/service/flag-evaluation/connect_service.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/pkg/service/flag-evaluation/connect_service.go b/core/pkg/service/flag-evaluation/connect_service.go index 86019c5e2..e46608199 100644 --- a/core/pkg/service/flag-evaluation/connect_service.go +++ b/core/pkg/service/flag-evaluation/connect_service.go @@ -34,6 +34,9 @@ type ConnectService struct { eventingConfiguration *eventingConfiguration server *http.Server metricsServer *http.Server + + serverMtx sync.RWMutex + metricsServerMtx sync.RWMutex } func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcConf service.Configuration) error { @@ -53,6 +56,8 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon }) g.Go(func() error { <-gCtx.Done() + s.serverMtx.RLock() + defer s.serverMtx.RUnlock() if s.server != nil { if err := s.server.Shutdown(gCtx); err != nil { return err @@ -62,6 +67,8 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon }) g.Go(func() error { <-gCtx.Done() + s.metricsServerMtx.RLock() + defer s.metricsServerMtx.RUnlock() if s.metricsServer != nil { if err := s.metricsServer.Shutdown(gCtx); err != nil { return err @@ -97,10 +104,12 @@ func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listene path, handler := schemaConnectV1.NewServiceHandler(fes) mux.Handle(path, handler) + s.serverMtx.Lock() s.server = &http.Server{ ReadHeaderTimeout: time.Second, Handler: handler, } + s.serverMtx.Unlock() // Add middlewares @@ -162,10 +171,12 @@ func (s *ConnectService) startServer(svcConf service.Configuration) error { func (s *ConnectService) startMetricsServer(svcConf service.Configuration) error { s.Logger.Info(fmt.Sprintf("metrics and probes listening at %d", svcConf.MetricsPort)) + s.metricsServerMtx.Lock() s.metricsServer = &http.Server{ Addr: fmt.Sprintf(":%d", svcConf.MetricsPort), ReadHeaderTimeout: 3 * time.Second, } + s.metricsServerMtx.Unlock() s.metricsServer.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/healthz": From 964843102fa80ff0c460265b61ef954155f4cf27 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Mon, 27 Mar 2023 11:30:49 +0200 Subject: [PATCH 3/5] fix linting errors Signed-off-by: Florian Bacher --- core/pkg/service/flag-evaluation/connect_service.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/pkg/service/flag-evaluation/connect_service.go b/core/pkg/service/flag-evaluation/connect_service.go index e46608199..6929d4ec6 100644 --- a/core/pkg/service/flag-evaluation/connect_service.go +++ b/core/pkg/service/flag-evaluation/connect_service.go @@ -5,12 +5,13 @@ import ( "context" "errors" "fmt" - "golang.org/x/sync/errgroup" "net" "net/http" "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/open-feature/flagd/core/pkg/service/middleware" schemaConnectV1 "buf.build/gen/go/open-feature/flagd/bufbuild/connect-go/schema/v1/schemav1connect" From c2f9896fc7f145cb4d14b5bd3c26cda2c3533fde Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Tue, 28 Mar 2023 15:22:09 +0200 Subject: [PATCH 4/5] pr review Signed-off-by: Florian Bacher --- core/pkg/service/flag-evaluation/connect_service.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/pkg/service/flag-evaluation/connect_service.go b/core/pkg/service/flag-evaluation/connect_service.go index 6929d4ec6..d5e424bfa 100644 --- a/core/pkg/service/flag-evaluation/connect_service.go +++ b/core/pkg/service/flag-evaluation/connect_service.go @@ -77,8 +77,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon } return nil }) - err := g.Wait() - if err != nil { + if err := g.Wait(); err != nil { return err } return nil From fa6690e028e374368f04642dbd1d7f317db73384 Mon Sep 17 00:00:00 2001 From: Florian Bacher Date: Wed, 29 Mar 2023 09:15:50 +0200 Subject: [PATCH 5/5] fix lint error Signed-off-by: Florian Bacher --- core/pkg/service/flag-evaluation/connect_service.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/core/pkg/service/flag-evaluation/connect_service.go b/core/pkg/service/flag-evaluation/connect_service.go index d5e424bfa..461623010 100644 --- a/core/pkg/service/flag-evaluation/connect_service.go +++ b/core/pkg/service/flag-evaluation/connect_service.go @@ -77,10 +77,7 @@ func (s *ConnectService) Serve(ctx context.Context, eval eval.IEvaluator, svcCon } return nil }) - if err := g.Wait(); err != nil { - return err - } - return nil + return g.Wait() } func (s *ConnectService) setupServer(svcConf service.Configuration) (net.Listener, error) {