From 44e87322f9f6da0a396ec3e55b352c1d69317e26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 11 May 2020 13:13:49 +0200 Subject: [PATCH 1/5] Update weaveworks/common MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- go.mod | 2 +- go.sum | 4 ++-- .../weaveworks/common/server/server.go | 24 ++++++++++++++++--- vendor/modules.txt | 2 +- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index ba3b1b437c7..135c36e07e4 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/stretchr/testify v1.4.0 github.com/thanos-io/thanos v0.12.3-0.20200505050643-5a777da8dfd2 github.com/uber/jaeger-client-go v2.20.1+incompatible - github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd + github.com/weaveworks/common v0.0.0-20200511094620-c4a9ff77246b go.etcd.io/bbolt v1.3.3 go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 go.uber.org/atomic v1.5.1 diff --git a/go.sum b/go.sum index 29e1e92fadd..03600acb9b7 100644 --- a/go.sum +++ b/go.sum @@ -887,8 +887,8 @@ github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6 github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a/go.mod h1:6enWAqfQBFrE8X/XdJwZr8IKgh1chStuFR0mjU/UOUw= -github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd h1:F6fe85b7l6qZhtMzuPDGeVSK05XAY3Jz8/MI3NR3lDM= -github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd/go.mod h1:c98fKi5B9u8OsKGiWHLRKus6ToQ1Tubeow44ECO1uxY= +github.com/weaveworks/common v0.0.0-20200511094620-c4a9ff77246b h1:L71lFsAt9d9AAbVx4kHfthOuVi5tBfjhAN0rQHDEKnU= +github.com/weaveworks/common v0.0.0-20200511094620-c4a9ff77246b/go.mod h1:c98fKi5B9u8OsKGiWHLRKus6ToQ1Tubeow44ECO1uxY= github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M= github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA= github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs= diff --git a/vendor/github.com/weaveworks/common/server/server.go b/vendor/github.com/weaveworks/common/server/server.go index 78204bcef05..9fb23820a50 100644 --- a/vendor/github.com/weaveworks/common/server/server.go +++ b/vendor/github.com/weaveworks/common/server/server.go @@ -31,6 +31,16 @@ import ( "github.com/weaveworks/common/signals" ) +// SignalHandler used by Server. +type SignalHandler interface { + // Starts the signals handler. This method is blocking, and returns only after signal is received, + // or "Stop" is called. + Loop() + + // Stop blocked "Loop" method. + Stop() +} + // Config for a Server type Config struct { MetricsNamespace string `yaml:"-"` @@ -69,6 +79,9 @@ type Config struct { LogLevel logging.Level `yaml:"log_level"` Log logging.Interface `yaml:"-"` + // If not set, default signal handler is used. + SignalHandler SignalHandler `yaml:"-"` + PathPrefix string `yaml:"http_path_prefix"` } @@ -112,7 +125,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Servers will be automatically instrumented for Prometheus metrics. type Server struct { cfg Config - handler *signals.Handler + handler SignalHandler grpcListener net.Listener httpListener net.Listener @@ -258,11 +271,16 @@ func New(cfg Config) (*Server, error) { httpServer.TLSConfig = httpTLSConfig } + handler := cfg.SignalHandler + if handler == nil { + handler = signals.NewHandler(log) + } + return &Server{ cfg: cfg, httpListener: httpListener, grpcListener: grpcListener, - handler: signals.NewHandler(log), + handler: handler, HTTP: router, HTTPServer: httpServer, @@ -277,7 +295,7 @@ func RegisterInstrumentation(router *mux.Router) { router.PathPrefix("/debug/pprof").Handler(http.DefaultServeMux) } -// Run the server; blocks until SIGTERM or an error is received. +// Run the server; blocks until SIGTERM (if signal handling is enabled), an error is received, or Stop() is called. func (s *Server) Run() error { errChan := make(chan error, 1) diff --git a/vendor/modules.txt b/vendor/modules.txt index afa05873f3f..d6147bb1650 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -576,7 +576,7 @@ github.com/uber/jaeger-client-go/utils # github.com/uber/jaeger-lib v2.2.0+incompatible github.com/uber/jaeger-lib/metrics github.com/uber/jaeger-lib/metrics/prometheus -# github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd +# github.com/weaveworks/common v0.0.0-20200511094620-c4a9ff77246b github.com/weaveworks/common/aws github.com/weaveworks/common/errors github.com/weaveworks/common/grpc From 63bbd010f7629a3465d2179dbba2edfea38c46e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 11 May 2020 14:07:31 +0200 Subject: [PATCH 2/5] Moved signal handling out of Server module. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This simplifies module service setup, we no longer need to make a distinction between services when using the wrapper or not. Signed-off-by: Peter Štibraný --- pkg/cortex/cortex.go | 37 +++++++++++++----------------------- pkg/cortex/modules.go | 12 ++++-------- pkg/cortex/server_service.go | 20 +++++++++++++++++-- pkg/util/module_service.go | 3 ++- 4 files changed, 37 insertions(+), 35 deletions(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index f8114720d2f..947e2e0eebe 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -14,6 +14,7 @@ import ( prom_storage "github.com/prometheus/prometheus/storage" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" + "github.com/weaveworks/common/signals" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "gopkg.in/yaml.v2" @@ -288,13 +289,7 @@ func (t *Cortex) initModuleServices() (map[ModuleName]services.Service, error) { var serv services.Service - if mod.service != nil { - s, err := mod.service(t) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) - } - serv = s - } else if mod.wrappedService != nil { + if mod.wrappedService != nil { s, err := mod.wrappedService(t) if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) @@ -356,30 +351,24 @@ func (t *Cortex) Run() error { sm.AddListener(services.NewManagerListener(healthy, stopped, serviceFailed)) - // Currently it's the Server that reacts on signal handler, - // so get Server service, and wait until it gets to Stopping state. - // It will also be stopped via service manager if any service fails (see attached service listener) - // Attach listener before starting services, or we may miss the notification. - serverStopping := make(chan struct{}) - t.ServiceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) { - close(serverStopping) - }, nil, nil)) + // Setup signal handler. If signal arrives, we stop the manager, which stops all the services. + handler := signals.NewHandler(t.Server.Log) + go func() { + handler.Loop() + sm.StopAsync() + }() // Start all services. This can really only fail if some service is already // in other state than New, which should not be the case. err = sm.StartAsync(context.Background()) if err == nil { - // no error starting the services, now let's just wait until Server module - // transitions to Stopping (after SIGTERM or when some service fails), - // and then initiate shutdown - <-serverStopping + // Wait until service manager stops. It can stop in two ways: + // 1) Signal is received and manager is stopped. + // 2) Any service fails. + err = sm.AwaitStopped(context.Background()) } - // Stop all the services, and wait until they are all done. - // We don't care about this error, as it cannot really fail. - _ = services.StopManagerAndAwaitStopped(context.Background(), sm) - - // if any service failed, report that as an error to caller + // If any service failed, report that as an error to caller if err == nil { if failed := sm.ServicesByState()[services.Failed]; len(failed) > 0 { for _, f := range failed { diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f86744c639d..e298a1f41c2 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -107,6 +107,8 @@ func (t *Cortex) initAPI() (services.Service, error) { } func (t *Cortex) initServer() (services.Service, error) { + // Cortex handles signals on its own. + DisableSignalHandling(&t.Cfg.Server) serv, err := server.New(t.Cfg.Server) if err != nil { return nil, err @@ -516,20 +518,14 @@ func (t *Cortex) initDataPurger() (services.Service, error) { type module struct { deps []ModuleName - // service for this module (can return nil) - service func(t *Cortex) (services.Service, error) - - // service that will be wrapped into moduleServiceWrapper, to wait for dependencies to start / end + // Service that will be wrapped into moduleServiceWrapper, to wait for dependencies to start / end // (can return nil) wrappedService func(t *Cortex) (services.Service, error) } var modules = map[ModuleName]module{ Server: { - // we cannot use 'wrappedService', as stopped Server service is currently a signal to Cortex - // that it should shutdown. If we used wrappedService, it wouldn't stop until - // all services that depend on it stopped first... but there is nothing that would make them stop. - service: (*Cortex).initServer, + wrappedService: (*Cortex).initServer, }, API: { diff --git a/pkg/cortex/server_service.go b/pkg/cortex/server_service.go index a16f67ac5c6..3bc5010c019 100644 --- a/pkg/cortex/server_service.go +++ b/pkg/cortex/server_service.go @@ -2,6 +2,7 @@ package cortex import ( "context" + "fmt" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/server" @@ -14,6 +15,7 @@ import ( // servicesToWaitFor is called when server is stopping, and should return all // services that need to terminate before server actually stops. // N.B.: this function is NOT Cortex specific, please let's keep it that way. +// Passed server should not react on signals. Early return from Run function is considered to be an error. func NewServerService(serv *server.Server, servicesToWaitFor func() []services.Service) services.Service { serverDone := make(chan error, 1) @@ -28,9 +30,9 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S return nil case err := <-serverDone: if err != nil { - level.Error(util.Logger).Log("msg", "server failed", "err", err) + return err } - return err + return fmt.Errorf("server stopped unexpectedly") } } @@ -51,3 +53,17 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S return services.NewBasicService(nil, runFn, stoppingFn) } + +func DisableSignalHandling(config *server.Config) { + config.SignalHandler = make(dummySignalHandler) +} + +type dummySignalHandler chan struct{} + +func (dh dummySignalHandler) Loop() { + <-dh +} + +func (dh dummySignalHandler) Stop() { + close(dh) +} diff --git a/pkg/util/module_service.go b/pkg/util/module_service.go index 32cfbd07992..870facdfa9a 100644 --- a/pkg/util/module_service.go +++ b/pkg/util/module_service.go @@ -95,11 +95,12 @@ func (w *moduleService) stop(_ error) error { func (w *moduleService) waitForModulesToStop() { // wait until all stopDeps have stopped stopDeps := w.stopDeps(w.name) - for _, s := range stopDeps { + for n, s := range stopDeps { if s == nil { continue } + level.Debug(Logger).Log("msg", "module waiting for", "module", w.name, "other_module", n) // Passed context isn't canceled, so we can only get error here, if service // fails. But we don't care *how* service stops, as long as it is done. _ = s.AwaitTerminated(context.Background()) From 25748530c60d2807e1e9c3fde45ad5c530cad718 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 11 May 2020 14:34:25 +0200 Subject: [PATCH 3/5] Updated tests to match updated logic. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cortex/server_service_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/cortex/server_service_test.go b/pkg/cortex/server_service_test.go index 8d5249bbf17..f199522696a 100644 --- a/pkg/cortex/server_service_test.go +++ b/pkg/cortex/server_service_test.go @@ -47,10 +47,11 @@ func TestServerStopViaShutdown(t *testing.T) { s := NewServerService(serv, func() []services.Service { return nil }) require.NoError(t, services.StartAndAwaitRunning(context.Background(), s)) - // we stop HTTP/gRPC Servers here... that should make server stop. + // Shutting down HTTP/gRPC servers makes Server stop, but ServerService doesn't expect that to happen. serv.Shutdown() - require.NoError(t, s.AwaitTerminated(context.Background())) + require.Error(t, s.AwaitTerminated(context.Background())) + require.Equal(t, services.Failed, s.State()) } func TestServerStopViaStop(t *testing.T) { @@ -69,5 +70,7 @@ func TestServerStopViaStop(t *testing.T) { serv.Stop() - require.NoError(t, s.AwaitTerminated(context.Background())) + // Stop makes Server stop, but ServerService doesn't expect that to happen. + require.Error(t, s.AwaitTerminated(context.Background())) + require.Equal(t, services.Failed, s.State()) } From 83fb34568c8966c168f906cd33f5a50603636e79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 14 May 2020 11:28:33 +0200 Subject: [PATCH 4/5] Review feedback. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cortex/server_service.go | 8 ++++---- pkg/util/module_service.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/cortex/server_service.go b/pkg/cortex/server_service.go index 3bc5010c019..c5a84942e81 100644 --- a/pkg/cortex/server_service.go +++ b/pkg/cortex/server_service.go @@ -55,15 +55,15 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S } func DisableSignalHandling(config *server.Config) { - config.SignalHandler = make(dummySignalHandler) + config.SignalHandler = make(ignoreSignalHandler) } -type dummySignalHandler chan struct{} +type ignoreSignalHandler chan struct{} -func (dh dummySignalHandler) Loop() { +func (dh ignoreSignalHandler) Loop() { <-dh } -func (dh dummySignalHandler) Stop() { +func (dh ignoreSignalHandler) Stop() { close(dh) } diff --git a/pkg/util/module_service.go b/pkg/util/module_service.go index 870facdfa9a..be12157ac26 100644 --- a/pkg/util/module_service.go +++ b/pkg/util/module_service.go @@ -100,7 +100,7 @@ func (w *moduleService) waitForModulesToStop() { continue } - level.Debug(Logger).Log("msg", "module waiting for", "module", w.name, "other_module", n) + level.Debug(Logger).Log("msg", "module waiting for", "module", w.name, "waiting_for", n) // Passed context isn't canceled, so we can only get error here, if service // fails. But we don't care *how* service stops, as long as it is done. _ = s.AwaitTerminated(context.Background()) From b0d4d0e0767de7506b90bb46a0223c253e95d227 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Thu, 14 May 2020 11:30:21 +0200 Subject: [PATCH 5/5] Update comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/cortex/cortex.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 947e2e0eebe..7fef93c1c49 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -368,7 +368,8 @@ func (t *Cortex) Run() error { err = sm.AwaitStopped(context.Background()) } - // If any service failed, report that as an error to caller + // If there is no error yet (= service manager started and then stopped without problems), + // but any service failed, report that failure as an error to caller. if err == nil { if failed := sm.ServicesByState()[services.Failed]; len(failed) > 0 { for _, f := range failed {