Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/stretchr/testify v1.4.0
github.com/thanos-io/thanos v0.12.3-0.20200505050643-5a777da8dfd2
github.com/uber/jaeger-client-go v2.20.1+incompatible
github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd
github.com/weaveworks/common v0.0.0-20200511094620-c4a9ff77246b
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738
go.uber.org/atomic v1.5.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,8 @@ github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/weaveworks/common v0.0.0-20200206153930-760e36ae819a/go.mod h1:6enWAqfQBFrE8X/XdJwZr8IKgh1chStuFR0mjU/UOUw=
github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd h1:F6fe85b7l6qZhtMzuPDGeVSK05XAY3Jz8/MI3NR3lDM=
github.com/weaveworks/common v0.0.0-20200429090833-ac38719f57dd/go.mod h1:c98fKi5B9u8OsKGiWHLRKus6ToQ1Tubeow44ECO1uxY=
github.com/weaveworks/common v0.0.0-20200511094620-c4a9ff77246b h1:L71lFsAt9d9AAbVx4kHfthOuVi5tBfjhAN0rQHDEKnU=
github.com/weaveworks/common v0.0.0-20200511094620-c4a9ff77246b/go.mod h1:c98fKi5B9u8OsKGiWHLRKus6ToQ1Tubeow44ECO1uxY=
github.com/weaveworks/promrus v1.2.0 h1:jOLf6pe6/vss4qGHjXmGz4oDJQA+AOCqEL3FvvZGz7M=
github.com/weaveworks/promrus v1.2.0/go.mod h1:SaE82+OJ91yqjrE1rsvBWVzNZKcHYFtMUyS1+Ogs/KA=
github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs=
Expand Down
38 changes: 14 additions & 24 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
prom_storage "github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/signals"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v2"
Expand Down Expand Up @@ -288,13 +289,7 @@ func (t *Cortex) initModuleServices() (map[ModuleName]services.Service, error) {

var serv services.Service

if mod.service != nil {
s, err := mod.service(t)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
}
serv = s
} else if mod.wrappedService != nil {
if mod.wrappedService != nil {
s, err := mod.wrappedService(t)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
Expand Down Expand Up @@ -356,30 +351,25 @@ func (t *Cortex) Run() error {

sm.AddListener(services.NewManagerListener(healthy, stopped, serviceFailed))

// Currently it's the Server that reacts on signal handler,
// so get Server service, and wait until it gets to Stopping state.
// It will also be stopped via service manager if any service fails (see attached service listener)
// Attach listener before starting services, or we may miss the notification.
serverStopping := make(chan struct{})
t.ServiceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) {
close(serverStopping)
}, nil, nil))
// Setup signal handler. If signal arrives, we stop the manager, which stops all the services.
handler := signals.NewHandler(t.Server.Log)
go func() {
handler.Loop()
sm.StopAsync()
}()

// Start all services. This can really only fail if some service is already
// in other state than New, which should not be the case.
err = sm.StartAsync(context.Background())
if err == nil {
// no error starting the services, now let's just wait until Server module
// transitions to Stopping (after SIGTERM or when some service fails),
// and then initiate shutdown
<-serverStopping
// Wait until service manager stops. It can stop in two ways:
// 1) Signal is received and manager is stopped.
// 2) Any service fails.
err = sm.AwaitStopped(context.Background())
}

// Stop all the services, and wait until they are all done.
// We don't care about this error, as it cannot really fail.
_ = services.StopManagerAndAwaitStopped(context.Background(), sm)

// if any service failed, report that as an error to caller
// If there is no error yet (= service manager started and then stopped without problems),
// but any service failed, report that failure as an error to caller.
if err == nil {
if failed := sm.ServicesByState()[services.Failed]; len(failed) > 0 {
for _, f := range failed {
Expand Down
12 changes: 4 additions & 8 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (t *Cortex) initAPI() (services.Service, error) {
}

func (t *Cortex) initServer() (services.Service, error) {
// Cortex handles signals on its own.
DisableSignalHandling(&t.Cfg.Server)
serv, err := server.New(t.Cfg.Server)
if err != nil {
return nil, err
Expand Down Expand Up @@ -516,20 +518,14 @@ func (t *Cortex) initDataPurger() (services.Service, error) {
type module struct {
deps []ModuleName

// service for this module (can return nil)
service func(t *Cortex) (services.Service, error)

// service that will be wrapped into moduleServiceWrapper, to wait for dependencies to start / end
// Service that will be wrapped into moduleServiceWrapper, to wait for dependencies to start / end
// (can return nil)
wrappedService func(t *Cortex) (services.Service, error)
}

var modules = map[ModuleName]module{
Server: {
// we cannot use 'wrappedService', as stopped Server service is currently a signal to Cortex
// that it should shutdown. If we used wrappedService, it wouldn't stop until
// all services that depend on it stopped first... but there is nothing that would make them stop.
service: (*Cortex).initServer,
wrappedService: (*Cortex).initServer,
},

API: {
Expand Down
20 changes: 18 additions & 2 deletions pkg/cortex/server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cortex

import (
"context"
"fmt"

"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/server"
Expand All @@ -14,6 +15,7 @@ import (
// servicesToWaitFor is called when server is stopping, and should return all
// services that need to terminate before server actually stops.
// N.B.: this function is NOT Cortex specific, please let's keep it that way.
// Passed server should not react on signals. Early return from Run function is considered to be an error.
func NewServerService(serv *server.Server, servicesToWaitFor func() []services.Service) services.Service {
serverDone := make(chan error, 1)

Expand All @@ -28,9 +30,9 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S
return nil
case err := <-serverDone:
if err != nil {
level.Error(util.Logger).Log("msg", "server failed", "err", err)
return err
}
return err
return fmt.Errorf("server stopped unexpectedly")
}
}

Expand All @@ -51,3 +53,17 @@ func NewServerService(serv *server.Server, servicesToWaitFor func() []services.S

return services.NewBasicService(nil, runFn, stoppingFn)
}

func DisableSignalHandling(config *server.Config) {
config.SignalHandler = make(ignoreSignalHandler)
}

type ignoreSignalHandler chan struct{}

func (dh ignoreSignalHandler) Loop() {
<-dh
}

func (dh ignoreSignalHandler) Stop() {
close(dh)
}
9 changes: 6 additions & 3 deletions pkg/cortex/server_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ func TestServerStopViaShutdown(t *testing.T) {
s := NewServerService(serv, func() []services.Service { return nil })
require.NoError(t, services.StartAndAwaitRunning(context.Background(), s))

// we stop HTTP/gRPC Servers here... that should make server stop.
// Shutting down HTTP/gRPC servers makes Server stop, but ServerService doesn't expect that to happen.
serv.Shutdown()

require.NoError(t, s.AwaitTerminated(context.Background()))
require.Error(t, s.AwaitTerminated(context.Background()))
require.Equal(t, services.Failed, s.State())
}

func TestServerStopViaStop(t *testing.T) {
Expand All @@ -69,5 +70,7 @@ func TestServerStopViaStop(t *testing.T) {

serv.Stop()

require.NoError(t, s.AwaitTerminated(context.Background()))
// Stop makes Server stop, but ServerService doesn't expect that to happen.
require.Error(t, s.AwaitTerminated(context.Background()))
require.Equal(t, services.Failed, s.State())
}
3 changes: 2 additions & 1 deletion pkg/util/module_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@ func (w *moduleService) stop(_ error) error {
func (w *moduleService) waitForModulesToStop() {
// wait until all stopDeps have stopped
stopDeps := w.stopDeps(w.name)
for _, s := range stopDeps {
for n, s := range stopDeps {
if s == nil {
continue
}

level.Debug(Logger).Log("msg", "module waiting for", "module", w.name, "waiting_for", n)
// Passed context isn't canceled, so we can only get error here, if service
// fails. But we don't care *how* service stops, as long as it is done.
_ = s.AwaitTerminated(context.Background())
Expand Down
24 changes: 21 additions & 3 deletions vendor/github.com/weaveworks/common/server/server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.