From 4f1373cfdabcea483936723493dd7032b1dd3769 Mon Sep 17 00:00:00 2001 From: Annanay Date: Sat, 18 Apr 2020 00:54:23 +0530 Subject: [PATCH 01/11] Generalise module service into separate package Signed-off-by: Annanay --- pkg/cortex/cortex.go | 196 ++---- pkg/cortex/cortex_test.go | 2 +- pkg/cortex/modules.go | 557 ++++++++---------- .../modules}/module_service_wrapper.go | 10 +- pkg/util/modules/modules.go | 158 +++++ pkg/util/modules/modules_test.go | 40 ++ 6 files changed, 486 insertions(+), 477 deletions(-) rename pkg/{cortex => util/modules}/module_service_wrapper.go (63%) create mode 100644 pkg/util/modules/modules.go create mode 100644 pkg/util/modules/modules_test.go diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index d441967e1f5..d7dac5bb0fe 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -44,6 +44,7 @@ import ( "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/grpc/healthcheck" + "github.com/cortexproject/cortex/pkg/util/modules" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -68,10 +69,10 @@ import ( // Config is the root config for Cortex. type Config struct { - Target ModuleName `yaml:"target"` - AuthEnabled bool `yaml:"auth_enabled"` - PrintConfig bool `yaml:"-"` - HTTPPrefix string `yaml:"http_prefix"` + Target string `yaml:"target"` + AuthEnabled bool `yaml:"auth_enabled"` + PrintConfig bool `yaml:"-"` + HTTPPrefix string `yaml:"http_prefix"` API api.Config `yaml:"api"` Server server.Config `yaml:"server"` @@ -105,9 +106,8 @@ type Config struct { // RegisterFlags registers flag. func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Server.MetricsNamespace = "cortex" - c.Target = All c.Server.ExcludeRequestInLog = true - f.Var(&c.Target, "target", "The Cortex service to run. Supported values are: all, distributor, ingester, querier, query-frontend, table-manager, ruler, alertmanager, configs.") + f.StringVar(&c.Target, "target", All, "The Cortex service to run. Supported values are: all, distributor, ingester, querier, query-frontend, table-manager, ruler, alertmanager, configs.") f.BoolVar(&c.AuthEnabled, "auth.enabled", true, "Set to false to disable auth.") f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") f.StringVar(&c.HTTPPrefix, "http.prefix", "/api/prom", "HTTP path prefix for Cortex API.") @@ -179,37 +179,38 @@ func (c *Config) Validate(log log.Logger) error { // Cortex is the root datastructure for Cortex. type Cortex struct { - target ModuleName + Cfg Config // set during initialization - serviceMap map[ModuleName]services.Service - - api *api.API - server *server.Server - ring *ring.Ring - overrides *validation.Overrides - distributor *distributor.Distributor - ingester *ingester.Ingester - flusher *flusher.Flusher - store chunk.Store - deletesStore *purger.DeleteStore - frontend *frontend.Frontend - tableManager *chunk.TableManager - cache cache.Cache - runtimeConfig *runtimeconfig.Manager - dataPurger *purger.DataPurger - - ruler *ruler.Ruler - configAPI *configAPI.API - configDB db.DB - alertmanager *alertmanager.MultitenantAlertmanager - compactor *compactor.Compactor - storeGateway *storegateway.StoreGateway - memberlistKV *memberlist.KVInit + ServiceMap map[string]services.Service + ModuleManager modules.Manager + + API *api.API + Server *server.Server + Ring *ring.Ring + Overrides *validation.Overrides + Distributor *distributor.Distributor + Ingester *ingester.Ingester + Flusher *flusher.Flusher + Store chunk.Store + DeletesStore *purger.DeleteStore + Frontend *frontend.Frontend + TableManager *chunk.TableManager + Cache cache.Cache + RuntimeConfig *runtimeconfig.Manager + DataPurger *purger.DataPurger + + Ruler *ruler.Ruler + ConfigAPI *configAPI.API + ConfigDB db.DB + Alertmanager *alertmanager.MultitenantAlertmanager + Compactor *compactor.Compactor + StoreGateway *storegateway.StoreGateway + MemberlistKV *memberlist.KVInit // Queryable that the querier should use to query the long // term storage. It depends on the storage engine used. - storeQueryable prom_storage.Queryable + StoreQueryable prom_storage.Queryable } // New makes a new Cortex. @@ -222,18 +223,13 @@ func New(cfg Config) (*Cortex, error) { } cortex := &Cortex{ - target: cfg.Target, + Cfg: cfg, } cortex.setupAuthMiddleware(&cfg) - serviceMap, err := cortex.initModuleServices(&cfg, cfg.Target) - if err != nil { - return nil, err - } - - cortex.serviceMap = serviceMap - cortex.api.RegisterServiceMapHandler(http.HandlerFunc(cortex.servicesHandler)) + mm := cortex.createModuleManager() + cortex.ModuleManager = mm return cortex, nil } @@ -269,49 +265,19 @@ func (t *Cortex) setupAuthMiddleware(cfg *Config) { } } -func (t *Cortex) initModuleServices(cfg *Config, target ModuleName) (map[ModuleName]services.Service, error) { - servicesMap := map[ModuleName]services.Service{} - - // initialize all of our dependencies first - deps := orderedDeps(target) - deps = append(deps, target) // lastly, initialize the requested module - - for ix, n := range deps { - mod := modules[n] - - var serv services.Service - - if mod.service != nil { - s, err := mod.service(t, cfg) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) - } - serv = s - } else if mod.wrappedService != nil { - s, err := mod.wrappedService(t, cfg) - if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) - } - if s != nil { - // We pass servicesMap, which isn't yet finished. By the time service starts, - // it will be fully built, so there is no need for extra synchronization. - serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, findInverseDependencies(n, deps[ix+1:])) - } - } - - if serv != nil { - servicesMap[n] = serv - } +// Run starts Cortex running, and blocks until a Cortex stops. +func (t *Cortex) Run() error { + serviceMap, err := t.ModuleManager.StartModule(t.Cfg.Target) + if err != nil { + return err } - return servicesMap, nil -} + t.ServiceMap = serviceMap + t.API.RegisterServiceMapHandler(http.HandlerFunc(t.servicesHandler)) -// Run starts Cortex running, and blocks until a Cortex stops. -func (t *Cortex) Run() error { // get all services, create service manager and tell it to start servs := []services.Service(nil) - for _, s := range t.serviceMap { + for _, s := range t.ServiceMap { servs = append(servs, s) } @@ -322,8 +288,8 @@ func (t *Cortex) Run() error { // before starting servers, register /ready handler and gRPC health check service. // It should reflect entire Cortex. - t.server.HTTP.Path("/ready").Handler(t.readyHandler(sm)) - grpc_health_v1.RegisterHealthServer(t.server.GRPC, healthcheck.New(sm)) + t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm)) + grpc_health_v1.RegisterHealthServer(t.Server.GRPC, healthcheck.New(sm)) // Let's listen for events from this manager, and log them. healthy := func() { level.Info(util.Logger).Log("msg", "Cortex started") } @@ -333,7 +299,7 @@ func (t *Cortex) Run() error { sm.StopAsync() // let's find out which module failed - for m, s := range t.serviceMap { + for m, s := range t.ServiceMap { if s == service { if service.FailureCase() == util.ErrStopProcess { level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase()) @@ -354,7 +320,7 @@ func (t *Cortex) Run() error { // It will also be stopped via service manager if any service fails (see attached service listener) // Attach listener before starting services, or we may miss the notification. serverStopping := make(chan struct{}) - t.serviceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) { + t.ServiceMap[Server].AddListener(services.NewListener(nil, nil, func(from services.State) { close(serverStopping) }, nil, nil)) @@ -404,8 +370,8 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc { // Ingester has a special check that makes sure that it was able to register into the ring, // and that all other ring entries are OK too. - if t.ingester != nil { - if err := t.ingester.CheckReady(r.Context()); err != nil { + if t.Ingester != nil { + if err := t.Ingester.CheckReady(r.Context()); err != nil { http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable) return } @@ -414,65 +380,3 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc { http.Error(w, "ready", http.StatusOK) } } - -// listDeps recursively gets a list of dependencies for a passed moduleName -func listDeps(m ModuleName) []ModuleName { - deps := modules[m].deps - for _, d := range modules[m].deps { - deps = append(deps, listDeps(d)...) - } - return deps -} - -// orderedDeps gets a list of all dependencies ordered so that items are always after any of their dependencies. -func orderedDeps(m ModuleName) []ModuleName { - deps := listDeps(m) - - // get a unique list of moduleNames, with a flag for whether they have been added to our result - uniq := map[ModuleName]bool{} - for _, dep := range deps { - uniq[dep] = false - } - - result := make([]ModuleName, 0, len(uniq)) - - // keep looping through all modules until they have all been added to the result. - - for len(result) < len(uniq) { - OUTER: - for name, added := range uniq { - if added { - continue - } - for _, dep := range modules[name].deps { - // stop processing this module if one of its dependencies has - // not been added to the result yet. - if !uniq[dep] { - continue OUTER - } - } - - // if all of the module's dependencies have been added to the result slice, - // then we can safely add this module to the result slice as well. - uniq[name] = true - result = append(result, name) - } - } - return result -} - -// find modules in the supplied list, that depend on mod -func findInverseDependencies(mod ModuleName, mods []ModuleName) []ModuleName { - result := []ModuleName(nil) - - for _, n := range mods { - for _, d := range modules[n].deps { - if d == mod { - result = append(result, n) - break - } - } - } - - return result -} diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index dee9bbad728..8795c3d8b25 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -66,5 +66,5 @@ func TestCortex(t *testing.T) { require.NotNil(t, c.serviceMap[Distributor]) // check that findInverseDependencie for Ring -- querier and distributor depend on Ring, so should be returned. - require.ElementsMatch(t, []ModuleName{Distributor, Querier}, findInverseDependencies(Ring, modules[cfg.Target].deps)) + // require.ElementsMatch(t, []ModuleName{Distributor, Querier}, findInverseDependencies(Ring, modules[cfg.Target].deps)) } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index adaeb89b348..dbb582ba573 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -3,7 +3,6 @@ package cortex import ( "fmt" "os" - "strings" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -31,92 +30,65 @@ import ( "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/modules" "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" ) -// ModuleName is used to describe a running module -type ModuleName string - // The various modules that make up Cortex. const ( - API ModuleName = "api" - Ring ModuleName = "ring" - RuntimeConfig ModuleName = "runtime-config" - Overrides ModuleName = "overrides" - Server ModuleName = "server" - Distributor ModuleName = "distributor" - Ingester ModuleName = "ingester" - Flusher ModuleName = "flusher" - Querier ModuleName = "querier" - StoreQueryable ModuleName = "store-queryable" - QueryFrontend ModuleName = "query-frontend" - Store ModuleName = "store" - DeleteRequestsStore ModuleName = "delete-requests-store" - TableManager ModuleName = "table-manager" - Ruler ModuleName = "ruler" - Configs ModuleName = "configs" - AlertManager ModuleName = "alertmanager" - Compactor ModuleName = "compactor" - StoreGateway ModuleName = "store-gateway" - MemberlistKV ModuleName = "memberlist-kv" - DataPurger ModuleName = "data-purger" - All ModuleName = "all" + API string = "api" + Ring string = "ring" + RuntimeConfig string = "runtime-config" + Overrides string = "overrides" + Server string = "server" + Distributor string = "distributor" + Ingester string = "ingester" + Flusher string = "flusher" + Querier string = "querier" + StoreQueryable string = "store-queryable" + QueryFrontend string = "query-frontend" + Store string = "store" + DeleteRequestsStore string = "delete-requests-store" + TableManager string = "table-manager" + Ruler string = "ruler" + Configs string = "configs" + AlertManager string = "alertmanager" + Compactor string = "compactor" + StoreGateway string = "store-gateway" + MemberlistKV string = "memberlist-kv" + DataPurger string = "data-purger" + All string = "all" ) -func (m ModuleName) String() string { - return string(m) -} - -func (m *ModuleName) Set(s string) error { - l := ModuleName(strings.ToLower(s)) - if _, ok := modules[l]; !ok { - return fmt.Errorf("unrecognised module name: %s", s) - } - *m = l - return nil -} - -func (m ModuleName) MarshalYAML() (interface{}, error) { - return m.String(), nil -} +func (t *Cortex) initAPI() (services.Service, error) { + t.Cfg.API.ServerPrefix = t.Cfg.Server.PathPrefix + t.Cfg.API.LegacyHTTPPrefix = t.Cfg.HTTPPrefix -func (m *ModuleName) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - return m.Set(s) -} - -func (t *Cortex) initAPI(cfg *Config) (services.Service, error) { - cfg.API.ServerPrefix = cfg.Server.PathPrefix - cfg.API.LegacyHTTPPrefix = cfg.HTTPPrefix - - a, err := api.New(cfg.API, t.server, util.Logger) + a, err := api.New(t.Cfg.API, t.Server, util.Logger) if err != nil { return nil, err } - t.api = a + t.API = a - t.api.RegisterAPI(cfg) + t.API.RegisterAPI(t.Cfg) return nil, nil } -func (t *Cortex) initServer(cfg *Config) (services.Service, error) { - serv, err := server.New(cfg.Server) +func (t *Cortex) initServer() (services.Service, error) { + serv, err := server.New(t.Cfg.Server) if err != nil { return nil, err } - t.server = serv + t.Server = serv servicesToWaitFor := func() []services.Service { svs := []services.Service(nil) - for m, s := range t.serviceMap { + for m, s := range t.ServiceMap { // Server should not wait for itself. if m != Server { svs = append(svs, s) @@ -125,92 +97,92 @@ func (t *Cortex) initServer(cfg *Config) (services.Service, error) { return svs } - s := NewServerService(t.server, servicesToWaitFor) + s := NewServerService(t.Server, servicesToWaitFor) return s, nil } -func (t *Cortex) initRing(cfg *Config) (serv services.Service, err error) { - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey) +func (t *Cortex) initRing() (serv services.Service, err error) { + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey) if err != nil { return nil, err } - prometheus.MustRegister(t.ring) + prometheus.MustRegister(t.Ring) - t.api.RegisterRing(t.ring) + t.API.RegisterRing(t.Ring) - return t.ring, nil + return t.Ring, nil } -func (t *Cortex) initRuntimeConfig(cfg *Config) (services.Service, error) { - if cfg.RuntimeConfig.LoadPath == "" { - cfg.RuntimeConfig.LoadPath = cfg.LimitsConfig.PerTenantOverrideConfig - cfg.RuntimeConfig.ReloadPeriod = cfg.LimitsConfig.PerTenantOverridePeriod +func (t *Cortex) initRuntimeConfig() (services.Service, error) { + if t.Cfg.RuntimeConfig.LoadPath == "" { + t.Cfg.RuntimeConfig.LoadPath = t.Cfg.LimitsConfig.PerTenantOverrideConfig + t.Cfg.RuntimeConfig.ReloadPeriod = t.Cfg.LimitsConfig.PerTenantOverridePeriod } - cfg.RuntimeConfig.Loader = loadRuntimeConfig + t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig // make sure to set default limits before we start loading configuration into memory - validation.SetDefaultLimitsForYAMLUnmarshalling(cfg.LimitsConfig) + validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig) - serv, err := runtimeconfig.NewRuntimeConfigManager(cfg.RuntimeConfig, prometheus.DefaultRegisterer) - t.runtimeConfig = serv + serv, err := runtimeconfig.NewRuntimeConfigManager(t.Cfg.RuntimeConfig, prometheus.DefaultRegisterer) + t.RuntimeConfig = serv return serv, err } -func (t *Cortex) initOverrides(cfg *Config) (serv services.Service, err error) { - t.overrides, err = validation.NewOverrides(cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig)) +func (t *Cortex) initOverrides() (serv services.Service, err error) { + t.Overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.RuntimeConfig)) // overrides don't have operational state, nor do they need to do anything more in starting/stopping phase, // so there is no need to return any service. return nil, err } -func (t *Cortex) initDistributor(cfg *Config) (serv services.Service, err error) { - cfg.Distributor.DistributorRing.ListenPort = cfg.Server.GRPCListenPort - cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV +func (t *Cortex) initDistributor() (serv services.Service, err error) { + t.Cfg.Distributor.DistributorRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV // Check whether the distributor can join the distributors ring, which is // whenever it's not running as an internal dependency (ie. querier or // ruler's dependency) - canJoinDistributorsRing := (cfg.Target == All || cfg.Target == Distributor) + canJoinDistributorsRing := (t.Cfg.Target == All || t.Cfg.Target == Distributor) - t.distributor, err = distributor.New(cfg.Distributor, cfg.IngesterClient, t.overrides, t.ring, canJoinDistributorsRing) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.Ring, canJoinDistributorsRing) if err != nil { return } - t.api.RegisterDistributor(t.distributor, cfg.Distributor) + t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor) - return t.distributor, nil + return t.Distributor, nil } -func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { +func (t *Cortex) initQuerier() (serv services.Service, err error) { var tombstonesLoader *purger.TombstonesLoader - if cfg.DataPurgerConfig.Enable { - tombstonesLoader = purger.NewTombstonesLoader(t.deletesStore) + if t.Cfg.DataPurgerConfig.Enable { + tombstonesLoader = purger.NewTombstonesLoader(t.DeletesStore) } else { // until we need to explicitly enable delete series support we need to do create TombstonesLoader without DeleteStore which acts as noop tombstonesLoader = purger.NewTombstonesLoader(nil) } - queryable, engine := querier.New(cfg.Querier, t.distributor, t.storeQueryable, tombstonesLoader, prometheus.DefaultRegisterer) + queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryable, tombstonesLoader, prometheus.DefaultRegisterer) // if we are not configured for single binary mode then the querier needs to register its paths externally - registerExternally := cfg.Target != All - handler := t.api.RegisterQuerier(queryable, engine, t.distributor, registerExternally) + registerExternally := t.Cfg.Target != All + handler := t.API.RegisterQuerier(queryable, engine, t.Distributor, registerExternally) // single binary mode requires a properly configured worker. if the operator did not attempt to configure the // worker we will attempt an automatic configuration here - if cfg.Worker.Address == "" && cfg.Target == All { - address := fmt.Sprintf("127.0.0.1:%d", cfg.Server.GRPCListenPort) + if t.Cfg.Worker.Address == "" && t.Cfg.Target == All { + address := fmt.Sprintf("127.0.0.1:%d", t.Cfg.Server.GRPCListenPort) level.Warn(util.Logger).Log("msg", "Worker address is empty in single binary mode. Attempting automatic worker configuration. If queries are unresponsive consider configuring the worker explicitly.", "address", address) - cfg.Worker.Address = address + t.Cfg.Worker.Address = address } // Query frontend worker will only be started after all its dependencies are started, not here. // Worker may also be nil, if not configured, which is OK. - worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(handler), util.Logger) + worker, err := frontend.NewWorker(t.Cfg.Worker, httpgrpc_server.NewServer(handler), util.Logger) if err != nil { return } @@ -218,89 +190,89 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) { return worker, nil } -func (t *Cortex) initStoreQueryable(cfg *Config) (services.Service, error) { - if cfg.Storage.Engine == storage.StorageEngineChunks { - t.storeQueryable = querier.NewChunkStoreQueryable(cfg.Querier, t.store) +func (t *Cortex) initStoreQueryable() (services.Service, error) { + if t.Cfg.Storage.Engine == storage.StorageEngineChunks { + t.StoreQueryable = querier.NewChunkStoreQueryable(t.Cfg.Querier, t.Store) return nil, nil } - if cfg.Storage.Engine == storage.StorageEngineTSDB { - storeQueryable, err := querier.NewBlockQueryable(cfg.TSDB, cfg.Server.LogLevel, prometheus.DefaultRegisterer) + if t.Cfg.Storage.Engine == storage.StorageEngineTSDB { + storeQueryable, err := querier.NewBlockQueryable(t.Cfg.TSDB, t.Cfg.Server.LogLevel, prometheus.DefaultRegisterer) if err != nil { return nil, err } - t.storeQueryable = storeQueryable + t.StoreQueryable = storeQueryable return storeQueryable, nil } - return nil, fmt.Errorf("unknown storage engine '%s'", cfg.Storage.Engine) + return nil, fmt.Errorf("unknown storage engine '%s'", t.Cfg.Storage.Engine) } -func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) { - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) - cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort - cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB - cfg.Ingester.TSDBConfig = cfg.TSDB - cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels +func (t *Cortex) initIngester() (serv services.Service, err error) { + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.RuntimeConfig) + t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + t.Cfg.Ingester.LifecyclerConfig.ListenPort = &t.Cfg.Server.GRPCListenPort + t.Cfg.Ingester.TSDBEnabled = t.Cfg.Storage.Engine == storage.StorageEngineTSDB + t.Cfg.Ingester.TSDBConfig = t.Cfg.TSDB + t.Cfg.Ingester.ShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels - t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Overrides, t.Store, prometheus.DefaultRegisterer) if err != nil { return } - t.api.RegisterIngester(t.ingester, cfg.Distributor) + t.API.RegisterIngester(t.Ingester, t.Cfg.Distributor) - return t.ingester, nil + return t.Ingester, nil } -func (t *Cortex) initFlusher(cfg *Config) (serv services.Service, err error) { - t.flusher, err = flusher.New( - cfg.Flusher, - cfg.Ingester, - cfg.IngesterClient, - t.store, +func (t *Cortex) initFlusher() (serv services.Service, err error) { + t.Flusher, err = flusher.New( + t.Cfg.Flusher, + t.Cfg.Ingester, + t.Cfg.IngesterClient, + t.Store, prometheus.DefaultRegisterer, ) if err != nil { return } - return t.flusher, nil + return t.Flusher, nil } -func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) { - if cfg.Storage.Engine == storage.StorageEngineTSDB { +func (t *Cortex) initStore() (serv services.Service, err error) { + if t.Cfg.Storage.Engine == storage.StorageEngineTSDB { return nil, nil } - err = cfg.Schema.Load() + err = t.Cfg.Schema.Load() if err != nil { return } - t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides) + t.Store, err = storage.NewStore(t.Cfg.Storage, t.Cfg.ChunkStore, t.Cfg.Schema, t.Overrides) if err != nil { return } return services.NewIdleService(nil, func(_ error) error { - t.store.Stop() + t.Store.Stop() return nil }), nil } -func (t *Cortex) initDeleteRequestsStore(cfg *Config) (serv services.Service, err error) { - if !cfg.DataPurgerConfig.Enable { +func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { + if !t.Cfg.DataPurgerConfig.Enable { return } var indexClient chunk.IndexClient - indexClient, err = storage.NewIndexClient(cfg.Storage.DeleteStoreConfig.Store, cfg.Storage, cfg.Schema) + indexClient, err = storage.NewIndexClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage, t.Cfg.Schema) if err != nil { return } - t.deletesStore, err = purger.NewDeleteStore(cfg.Storage.DeleteStoreConfig, indexClient) + t.DeletesStore, err = purger.NewDeleteStore(t.Cfg.Storage.DeleteStoreConfig, indexClient) if err != nil { return } @@ -308,322 +280,257 @@ func (t *Cortex) initDeleteRequestsStore(cfg *Config) (serv services.Service, er return } -func (t *Cortex) initQueryFrontend(cfg *Config) (serv services.Service, err error) { +func (t *Cortex) initQueryFrontend() (serv services.Service, err error) { // Load the schema only if sharded queries is set. - if cfg.QueryRange.ShardedQueries { - err = cfg.Schema.Load() + if t.Cfg.QueryRange.ShardedQueries { + err = t.Cfg.Schema.Load() if err != nil { return } } - t.frontend, err = frontend.New(cfg.Frontend, util.Logger, prometheus.DefaultRegisterer) + t.Frontend, err = frontend.New(t.Cfg.Frontend, util.Logger, prometheus.DefaultRegisterer) if err != nil { return } tripperware, cache, err := queryrange.NewTripperware( - cfg.QueryRange, + t.Cfg.QueryRange, util.Logger, - t.overrides, + t.Overrides, queryrange.PrometheusCodec, queryrange.PrometheusResponseExtractor, - cfg.Schema, + t.Cfg.Schema, promql.EngineOpts{ Logger: util.Logger, Reg: prometheus.DefaultRegisterer, - MaxSamples: cfg.Querier.MaxSamples, - Timeout: cfg.Querier.Timeout, + MaxSamples: t.Cfg.Querier.MaxSamples, + Timeout: t.Cfg.Querier.Timeout, }, - cfg.Querier.QueryIngestersWithin, + t.Cfg.Querier.QueryIngestersWithin, prometheus.DefaultRegisterer, ) if err != nil { return nil, err } - t.cache = cache - t.frontend.Wrap(tripperware) + t.Cache = cache + t.Frontend.Wrap(tripperware) - t.api.RegisterQueryFrontend(t.frontend) + t.API.RegisterQueryFrontend(t.Frontend) return services.NewIdleService(nil, func(_ error) error { - t.frontend.Close() - if t.cache != nil { - t.cache.Stop() - t.cache = nil + t.Frontend.Close() + if t.Cache != nil { + t.Cache.Stop() + t.Cache = nil } return nil }), nil } -func (t *Cortex) initTableManager(cfg *Config) (services.Service, error) { - if cfg.Storage.Engine == storage.StorageEngineTSDB { +func (t *Cortex) initTableManager() (services.Service, error) { + if t.Cfg.Storage.Engine == storage.StorageEngineTSDB { return nil, nil // table manager isn't used in v2 } - err := cfg.Schema.Load() + err := t.Cfg.Schema.Load() if err != nil { return nil, err } // Assume the newest config is the one to use - lastConfig := &cfg.Schema.Configs[len(cfg.Schema.Configs)-1] - - if (cfg.TableManager.ChunkTables.WriteScale.Enabled || - cfg.TableManager.IndexTables.WriteScale.Enabled || - cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || - cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || - cfg.TableManager.ChunkTables.ReadScale.Enabled || - cfg.TableManager.IndexTables.ReadScale.Enabled || - cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || - cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && - cfg.Storage.AWSStorageConfig.Metrics.URL == "" { + lastConfig := &t.Cfg.Schema.Configs[len(t.Cfg.Schema.Configs)-1] + + if (t.Cfg.TableManager.ChunkTables.WriteScale.Enabled || + t.Cfg.TableManager.IndexTables.WriteScale.Enabled || + t.Cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled || + t.Cfg.TableManager.IndexTables.InactiveWriteScale.Enabled || + t.Cfg.TableManager.ChunkTables.ReadScale.Enabled || + t.Cfg.TableManager.IndexTables.ReadScale.Enabled || + t.Cfg.TableManager.ChunkTables.InactiveReadScale.Enabled || + t.Cfg.TableManager.IndexTables.InactiveReadScale.Enabled) && + t.Cfg.Storage.AWSStorageConfig.Metrics.URL == "" { level.Error(util.Logger).Log("msg", "WriteScale is enabled but no Metrics URL has been provided") os.Exit(1) } - tableClient, err := storage.NewTableClient(lastConfig.IndexType, cfg.Storage) + tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.Storage) if err != nil { return nil, err } - bucketClient, err := storage.NewBucketClient(cfg.Storage) + bucketClient, err := storage.NewBucketClient(t.Cfg.Storage) util.CheckFatal("initializing bucket client", err) - t.tableManager, err = chunk.NewTableManager(cfg.TableManager, cfg.Schema, cfg.Ingester.MaxChunkAge, tableClient, bucketClient, prometheus.DefaultRegisterer) - return t.tableManager, err + t.TableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.Schema, t.Cfg.Ingester.MaxChunkAge, tableClient, bucketClient, prometheus.DefaultRegisterer) + return t.TableManager, err } -func (t *Cortex) initRuler(cfg *Config) (serv services.Service, err error) { +func (t *Cortex) initRuler() (serv services.Service, err error) { var tombstonesLoader *purger.TombstonesLoader - if cfg.DataPurgerConfig.Enable { - tombstonesLoader = purger.NewTombstonesLoader(t.deletesStore) + if t.Cfg.DataPurgerConfig.Enable { + tombstonesLoader = purger.NewTombstonesLoader(t.DeletesStore) } else { // until we need to explicitly enable delete series support we need to do create TombstonesLoader without DeleteStore which acts as noop tombstonesLoader = purger.NewTombstonesLoader(nil) } - cfg.Ruler.Ring.ListenPort = cfg.Server.GRPCListenPort - cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV - queryable, engine := querier.New(cfg.Querier, t.distributor, t.storeQueryable, tombstonesLoader, prometheus.DefaultRegisterer) + t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV + queryable, engine := querier.New(t.Cfg.Querier, t.Distributor, t.StoreQueryable, tombstonesLoader, prometheus.DefaultRegisterer) - t.ruler, err = ruler.NewRuler(cfg.Ruler, engine, queryable, t.distributor, prometheus.DefaultRegisterer, util.Logger) + t.Ruler, err = ruler.NewRuler(t.Cfg.Ruler, engine, queryable, t.Distributor, prometheus.DefaultRegisterer, util.Logger) if err != nil { return } // Expose HTTP endpoints. - t.api.RegisterRuler(t.ruler, cfg.Ruler.EnableAPI) + t.API.RegisterRuler(t.Ruler, t.Cfg.Ruler.EnableAPI) - return t.ruler, nil + return t.Ruler, nil } -func (t *Cortex) initConfig(cfg *Config) (serv services.Service, err error) { - t.configDB, err = db.New(cfg.Configs.DB) +func (t *Cortex) initConfig() (serv services.Service, err error) { + t.ConfigDB, err = db.New(t.Cfg.Configs.DB) if err != nil { return } - t.configAPI = configAPI.New(t.configDB, cfg.Configs.API) - t.configAPI.RegisterRoutes(t.server.HTTP) + t.ConfigAPI = configAPI.New(t.ConfigDB, t.Cfg.Configs.API) + t.ConfigAPI.RegisterRoutes(t.Server.HTTP) return services.NewIdleService(nil, func(_ error) error { - t.configDB.Close() + t.ConfigDB.Close() return nil }), nil } -func (t *Cortex) initAlertManager(cfg *Config) (serv services.Service, err error) { - t.alertmanager, err = alertmanager.NewMultitenantAlertmanager(&cfg.Alertmanager, util.Logger, prometheus.DefaultRegisterer) +func (t *Cortex) initAlertManager() (serv services.Service, err error) { + t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, util.Logger, prometheus.DefaultRegisterer) if err != nil { return } - t.api.RegisterAlertmanager(t.alertmanager, cfg.Target == AlertManager) - return t.alertmanager, nil + t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Target == AlertManager) + return t.Alertmanager, nil } -func (t *Cortex) initCompactor(cfg *Config) (serv services.Service, err error) { - cfg.Compactor.ShardingRing.ListenPort = cfg.Server.GRPCListenPort - cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV +func (t *Cortex) initCompactor() (serv services.Service, err error) { + t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.Compactor.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.compactor, err = compactor.NewCompactor(cfg.Compactor, cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) + t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.TSDB, util.Logger, prometheus.DefaultRegisterer) if err != nil { return } // Expose HTTP endpoints. - t.api.RegisterCompactor(t.compactor) - return t.compactor, nil + t.API.RegisterCompactor(t.Compactor) + return t.Compactor, nil } -func (t *Cortex) initStoreGateway(cfg *Config) (serv services.Service, err error) { - if cfg.Storage.Engine != storage.StorageEngineTSDB { +func (t *Cortex) initStoreGateway() (serv services.Service, err error) { + if t.Cfg.Storage.Engine != storage.StorageEngineTSDB { return nil, nil } - cfg.StoreGateway.ShardingRing.ListenPort = cfg.Server.GRPCListenPort - cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV + t.Cfg.StoreGateway.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort + t.Cfg.StoreGateway.ShardingRing.KVStore.MemberlistKV = t.MemberlistKV.GetMemberlistKV - t.storeGateway, err = storegateway.NewStoreGateway(cfg.StoreGateway, cfg.TSDB, cfg.Server.LogLevel, util.Logger, prometheus.DefaultRegisterer) + t.StoreGateway, err = storegateway.NewStoreGateway(t.Cfg.StoreGateway, t.Cfg.TSDB, t.Cfg.Server.LogLevel, util.Logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } // Expose HTTP endpoints. - t.api.RegisterStoreGateway(t.storeGateway) + t.API.RegisterStoreGateway(t.StoreGateway) - return t.storeGateway, nil + return t.StoreGateway, nil } -func (t *Cortex) initMemberlistKV(cfg *Config) (services.Service, error) { - cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer - cfg.MemberlistKV.Codecs = []codec.Codec{ +func (t *Cortex) initMemberlistKV() (services.Service, error) { + t.Cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer + t.Cfg.MemberlistKV.Codecs = []codec.Codec{ ring.GetCodec(), } - t.memberlistKV = memberlist.NewKVInit(&cfg.MemberlistKV) + t.MemberlistKV = memberlist.NewKVInit(&t.Cfg.MemberlistKV) return services.NewIdleService(nil, func(_ error) error { - t.memberlistKV.Stop() + t.MemberlistKV.Stop() return nil }), nil } -func (t *Cortex) initDataPurger(cfg *Config) (services.Service, error) { - if !cfg.DataPurgerConfig.Enable { +func (t *Cortex) initDataPurger() (services.Service, error) { + if !t.Cfg.DataPurgerConfig.Enable { return nil, nil } - storageClient, err := storage.NewObjectClient(cfg.DataPurgerConfig.ObjectStoreType, cfg.Storage) + storageClient, err := storage.NewObjectClient(t.Cfg.DataPurgerConfig.ObjectStoreType, t.Cfg.Storage) if err != nil { return nil, err } - t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient) + t.DataPurger, err = purger.NewDataPurger(t.Cfg.DataPurgerConfig, t.DeletesStore, t.Store, storageClient) if err != nil { return nil, err } - t.api.RegisterPurger(t.deletesStore) - - return t.dataPurger, nil -} - -type module struct { - deps []ModuleName - - // service for this module (can return nil) - service func(t *Cortex, cfg *Config) (services.Service, error) + t.API.RegisterPurger(t.DeletesStore) - // service that will be wrapped into moduleServiceWrapper, to wait for dependencies to start / end - // (can return nil) - wrappedService func(t *Cortex, cfg *Config) (services.Service, error) + return t.DataPurger, nil } -var modules = map[ModuleName]module{ - Server: { - // we cannot use 'wrappedService', as stopped Server service is currently a signal to Cortex - // that it should shutdown. If we used wrappedService, it wouldn't stop until - // all services that depend on it stopped first... but there is nothing that would make them stop. - service: (*Cortex).initServer, - }, - - API: { - deps: []ModuleName{Server}, - wrappedService: (*Cortex).initAPI, - }, - - RuntimeConfig: { - wrappedService: (*Cortex).initRuntimeConfig, - }, - - MemberlistKV: { - wrappedService: (*Cortex).initMemberlistKV, - }, - - Ring: { - deps: []ModuleName{API, RuntimeConfig, MemberlistKV}, - wrappedService: (*Cortex).initRing, - }, - - Overrides: { - deps: []ModuleName{RuntimeConfig}, - wrappedService: (*Cortex).initOverrides, - }, - - Distributor: { - deps: []ModuleName{Ring, API, Overrides}, - wrappedService: (*Cortex).initDistributor, - }, - - Store: { - deps: []ModuleName{Overrides}, - wrappedService: (*Cortex).initStore, - }, - - DeleteRequestsStore: { - wrappedService: (*Cortex).initDeleteRequestsStore, - }, - - Ingester: { - deps: []ModuleName{Overrides, Store, API, RuntimeConfig, MemberlistKV}, - wrappedService: (*Cortex).initIngester, - }, - - Flusher: { - deps: []ModuleName{Store, API}, - wrappedService: (*Cortex).initFlusher, - }, - - Querier: { - deps: []ModuleName{Distributor, Store, Ring, API, StoreQueryable}, - wrappedService: (*Cortex).initQuerier, - }, - - StoreQueryable: { - deps: []ModuleName{Store, DeleteRequestsStore}, - wrappedService: (*Cortex).initStoreQueryable, - }, - - QueryFrontend: { - deps: []ModuleName{API, Overrides}, - wrappedService: (*Cortex).initQueryFrontend, - }, - - TableManager: { - deps: []ModuleName{API}, - wrappedService: (*Cortex).initTableManager, - }, - - Ruler: { - deps: []ModuleName{Distributor, Store, StoreQueryable}, - wrappedService: (*Cortex).initRuler, - }, - - Configs: { - deps: []ModuleName{API}, - wrappedService: (*Cortex).initConfig, - }, - - AlertManager: { - deps: []ModuleName{API}, - wrappedService: (*Cortex).initAlertManager, - }, - - Compactor: { - deps: []ModuleName{API}, - wrappedService: (*Cortex).initCompactor, - }, - - StoreGateway: { - deps: []ModuleName{API}, - wrappedService: (*Cortex).initStoreGateway, - }, - - DataPurger: { - deps: []ModuleName{Store, DeleteRequestsStore, API}, - wrappedService: (*Cortex).initDataPurger, - }, - - All: { - deps: []ModuleName{QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway}, - }, +func (t *Cortex) createModuleManager() modules.Manager { + mm := modules.NewManager() + + // Register all modules here. + // RegisterModule(name string, initFn func()(services.Service, error)) + mm.RegisterModule(Server, t.initServer) + mm.RegisterModule(API, t.initRuntimeConfig) + mm.RegisterModule(MemberlistKV, t.initMemberlistKV) + mm.RegisterModule(Ring, t.initRing) + mm.RegisterModule(Overrides, t.initOverrides) + mm.RegisterModule(Distributor, t.initDistributor) + mm.RegisterModule(Store, t.initStore) + mm.RegisterModule(DeleteRequestsStore, t.initDeleteRequestsStore) + mm.RegisterModule(Ingester, t.initIngester) + mm.RegisterModule(Flusher, t.initFlusher) + mm.RegisterModule(Querier, t.initQuerier) + mm.RegisterModule(StoreQueryable, t.initStoreQueryable) + mm.RegisterModule(QueryFrontend, t.initQueryFrontend) + mm.RegisterModule(TableManager, t.initTableManager) + mm.RegisterModule(Ruler, t.initRuler) + mm.RegisterModule(Configs, t.initConfig) + mm.RegisterModule(AlertManager, t.initAlertManager) + mm.RegisterModule(Compactor, t.initCompactor) + mm.RegisterModule(StoreGateway, t.initStoreGateway) + mm.RegisterModule(DataPurger, t.initDataPurger) + mm.RegisterModule(All, nil) + mm.RegisterModule(StoreGateway, t.initStoreGateway) + + // Add dependencies + deps := map[string][]string{ + API: []string{Server}, + Ring: []string{API, RuntimeConfig, MemberlistKV}, + Overrides: []string{RuntimeConfig}, + Distributor: []string{Ring, API, Overrides}, + Store: []string{Overrides, DeleteRequestsStore}, + Ingester: []string{Overrides, Store, API, RuntimeConfig, MemberlistKV}, + Flusher: []string{Store, API}, + Querier: []string{Distributor, Store, Ring, API, StoreQueryable}, + StoreQueryable: []string{Store}, + QueryFrontend: []string{API, Overrides, DeleteRequestsStore}, + TableManager: []string{API}, + Ruler: []string{Distributor, Store, StoreQueryable}, + Configs: []string{API}, + AlertManager: []string{API}, + Compactor: []string{API}, + StoreGateway: []string{API}, + DataPurger: []string{Store, DeleteRequestsStore, API}, + All: []string{QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway} + } + for mod, targets := range deps { + mm.AddDependency(m, targets...) + } + + return mm } diff --git a/pkg/cortex/module_service_wrapper.go b/pkg/util/modules/module_service_wrapper.go similarity index 63% rename from pkg/cortex/module_service_wrapper.go rename to pkg/util/modules/module_service_wrapper.go index 2ac4cc87e53..73f1014bd58 100644 --- a/pkg/cortex/module_service_wrapper.go +++ b/pkg/util/modules/module_service_wrapper.go @@ -1,4 +1,4 @@ -package cortex +package modules import ( "github.com/cortexproject/cortex/pkg/util" @@ -7,19 +7,19 @@ import ( // This function wraps module service, and adds waiting for dependencies to start before starting, // and dependant modules to stop before stopping this module service. -func newModuleServiceWrapper(serviceMap map[ModuleName]services.Service, mod ModuleName, modServ services.Service, startDeps []ModuleName, stopDeps []ModuleName) services.Service { - getDeps := func(deps []ModuleName) map[string]services.Service { +func newModuleServiceWrapper(serviceMap map[string]services.Service, mod string, modServ services.Service, startDeps []string, stopDeps []string) services.Service { + getDeps := func(deps []string) map[string]services.Service { r := map[string]services.Service{} for _, m := range deps { s := serviceMap[m] if s != nil { - r[string(m)] = s + r[m] = s } } return r } - return util.NewModuleService(string(mod), modServ, + return util.NewModuleService(mod, modServ, func(_ string) map[string]services.Service { return getDeps(startDeps) }, diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go new file mode 100644 index 00000000000..46e18199e28 --- /dev/null +++ b/pkg/util/modules/modules.go @@ -0,0 +1,158 @@ +package modules + +import ( + "fmt" + + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/pkg/errors" +) + +type service func() (services.Service, error) + +// module is the basic building block of the application +type module struct { + // dependencies of this module + deps []string + + // initFn for this module (can return nil) + initFn service +} + +// Manager is a component that initialises modules of the application +// in the right order of dependencies. +type Manager struct { + modules map[string]module +} + +// NewManager creates a new Manager +func NewManager() *Manager { + return &Manager{ + modules: make(map[string]module), + } +} + +// RegisterModule registers a new module with ModuleManager +func (m *Manager) RegisterModule(name string, initFn service) { + m.modules[name] = module{ + initFn: initFn, + options: options, + } + return +} + +// AddDependency adds a dependency from name(source) to dependsOn(targets) +func (m *Manager) AddDependency(name string, dependsOn ...string) error { + if mod, ok := m.modules[name]; ok { + for dep := range dependsOn { + if _, ok := m.modules[dep]; ok { + mod.deps = append(mod.deps, dep) + } else { + return fmt.Errorf("no such module: %s", dep) + } + } + } else { + return fmt.Errorf("no such module: %s", name) + } + return nil +} + +// InitModuleServices starts the target module +func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) { + servicesMap := map[ModuleName]services.Service{} + + // initialize all of our dependencies first + deps := m.orderedDeps(target) + deps = append(deps, target) // lastly, initialize the requested module + + for ix, n := range deps { + mod := modules[n] + + var serv services.Service + + if mod.initFn != nil { + s, err := mod.initFn() + if err != nil { + return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) + } + + invDeps := m.findInverseDependencies(n, deps[ix+1:]) + if s == nil { + if invDeps != nil { + return nil, fmt.Errorf("module %s returned nil service but has other modules dependent on it", n) + } + } else { + // We pass servicesMap, which isn't yet complete. By the time service starts, + // it will be fully built, so there is no need for extra synchronization. + serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, invDeps) + } + } + + if serv != nil { + servicesMap[n] = serv + } + } + + return servicesMap, nil +} + +// listDeps recursively gets a list of dependencies for a passed moduleName +func (m *Manager) listDeps(mod string) []string { + deps := m.modules[mod].deps + for _, d := range m.modules[mod].deps { + deps = append(deps, m.listDeps(d)...) + } + return deps +} + +// orderedDeps gets a list of all dependencies ordered so that items are always after any of their dependencies. +func (m *Manager) orderedDeps(mod string) []string { + deps := m.listDeps(mod) + + // get a unique list of moduleNames, with a flag for whether they have been added to our result + uniq := map[string]bool{} + for _, dep := range deps { + uniq[dep] = false + } + + result := make([]string, 0, len(uniq)) + + // keep looping through all modules until they have all been added to the result. + + for len(result) < len(uniq) { + OUTER: + for name, added := range uniq { + if added { + continue + } + for _, dep := range m.modules[name].deps { + // stop processing this module if one of its dependencies has + // not been added to the result yet. + if !uniq[dep] { + continue OUTER + } + } + + // if all of the module's dependencies have been added to the result slice, + // then we can safely add this module to the result slice as well. + uniq[name] = true + result = append(result, name) + } + } + return result +} + +// find modules in the supplied list, that depend on mod +func (m *Manager) findInverseDependencies(mod string, mods []string) []string { + result := []string(nil) + + for _, n := range mods { + for _, d := range m.modules[n].deps { + if d == mod { + result = append(result, n) + break + } + } + } + + return result +} diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go new file mode 100644 index 00000000000..e45f2657455 --- /dev/null +++ b/pkg/util/modules/modules_test.go @@ -0,0 +1,40 @@ +package modules + +import ( + "testing" + + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/stretchr/testify/assert" +) + +func mockInitFunc() (services.Service, error) { return nil, nil } + +func TestDependencies(t *testing.T) { + var testModules = map[string]module{ + "serviceA": { + initService: mockInitFunc, + }, + + "serviceB": { + deps: []string{"serviceA"}, + initService: mockInitFunc, + }, + + "serviceC": { + deps: []string{"serviceB"}, + initService: mockInitFunc, + }, + } + + mm := &moduleManager{} + for name, mod := range testModules { + mm.RegisterModule(name, mod.deps, mod.initService, nil) + } + svcs, err := mm.StartModule("serviceC") + assert.NotNil(t, svcs) + assert.NoError(t, err) + + invDeps := mm.findInverseDependencies("serviceB") + assert.Len(t, invDeps, 1) + assert.Equal(t, invDeps[0], "serviceC") +} From e4126037cd617364cd35579e1a8a23af85c1ed45 Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 5 May 2020 19:59:33 +0530 Subject: [PATCH 02/11] Fix build and tests Signed-off-by: Annanay --- cmd/cortex/main.go | 2 +- pkg/cortex/cortex.go | 4 +-- pkg/cortex/cortex_test.go | 12 ++++----- pkg/cortex/modules.go | 42 ++++++++++++++++---------------- pkg/cortex/status.go | 2 +- pkg/util/modules/modules.go | 9 +++---- pkg/util/modules/modules_test.go | 18 +++++++------- 7 files changed, 44 insertions(+), 45 deletions(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 4c0e0cd382e..19558579196 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -104,7 +104,7 @@ func main() { util.InitEvents(eventSampleRate) // Setting the environment variable JAEGER_AGENT_HOST enables tracing - if trace, err := tracing.NewFromEnv("cortex-" + cfg.Target.String()); err != nil { + if trace, err := tracing.NewFromEnv("cortex-" + cfg.Target); err != nil { level.Error(util.Logger).Log("msg", "Failed to setup tracing", "err", err.Error()) } else { defer trace.Close() diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index fdf305d9af2..5bc366c407c 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -189,7 +189,7 @@ type Cortex struct { // set during initialization ServiceMap map[string]services.Service - ModuleManager modules.Manager + ModuleManager *modules.Manager API *api.API Server *server.Server @@ -274,7 +274,7 @@ func (t *Cortex) setupAuthMiddleware(cfg *Config) { // Run starts Cortex running, and blocks until a Cortex stops. func (t *Cortex) Run() error { - serviceMap, err := t.ModuleManager.StartModule(t.Cfg.Target) + serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target) if err != nil { return err } diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 8795c3d8b25..d8aaec8fc13 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -52,18 +52,18 @@ func TestCortex(t *testing.T) { c, err := New(cfg) require.NoError(t, err) - require.NotNil(t, c.serviceMap) + require.NotNil(t, c.ServiceMap) - for m, s := range c.serviceMap { + for m, s := range c.ServiceMap { // make sure each service is still New require.Equal(t, services.New, s.State(), "module: %s", m) } // check random modules that we expect to be configured when using Target=All - require.NotNil(t, c.serviceMap[Server]) - require.NotNil(t, c.serviceMap[Ingester]) - require.NotNil(t, c.serviceMap[Ring]) - require.NotNil(t, c.serviceMap[Distributor]) + require.NotNil(t, c.ServiceMap[Server]) + require.NotNil(t, c.ServiceMap[Ingester]) + require.NotNil(t, c.ServiceMap[Ring]) + require.NotNil(t, c.ServiceMap[Distributor]) // check that findInverseDependencie for Ring -- querier and distributor depend on Ring, so should be returned. // require.ElementsMatch(t, []ModuleName{Distributor, Querier}, findInverseDependencies(Ring, modules[cfg.Target].deps)) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index c3791af0916..d0de6d8a72f 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -174,7 +174,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { // Query frontend worker will only be started after all its dependencies are started, not here. // Worker may also be nil, if not configured, which is OK. - worker, err := frontend.NewWorker(t.Cfg.Worker, cfg.Querier, httpgrpc_server.NewServer(handler), util.Logger) + worker, err := frontend.NewWorker(t.Cfg.Worker, t.Cfg.Querier, httpgrpc_server.NewServer(handler), util.Logger) if err != nil { return } @@ -287,7 +287,7 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { return } - t.TombstonesLoader = purger.NewTombstonesLoader(t.deletesStore, prometheus.DefaultRegisterer) + t.TombstonesLoader = purger.NewTombstonesLoader(t.DeletesStore, prometheus.DefaultRegisterer) return } @@ -485,7 +485,7 @@ func (t *Cortex) initDataPurger() (services.Service, error) { return t.DataPurger, nil } -func (t *Cortex) createModuleManager() modules.Manager { +func (t *Cortex) createModuleManager() *modules.Manager { mm := modules.NewManager() // Register all modules here. @@ -516,27 +516,27 @@ func (t *Cortex) createModuleManager() modules.Manager { // Add dependencies deps := map[string][]string{ - API: []string{Server}, - Ring: []string{API, RuntimeConfig, MemberlistKV}, - Overrides: []string{RuntimeConfig}, - Distributor: []string{Ring, API, Overrides}, - Store: []string{Overrides, DeleteRequestsStore}, - Ingester: []string{Overrides, Store, API, RuntimeConfig, MemberlistKV}, - Flusher: []string{Store, API}, - Querier: []string{Distributor, Store, Ring, API, StoreQueryable}, + API: []string{Server}, + Ring: []string{API, RuntimeConfig, MemberlistKV}, + Overrides: []string{RuntimeConfig}, + Distributor: []string{Ring, API, Overrides}, + Store: []string{Overrides, DeleteRequestsStore}, + Ingester: []string{Overrides, Store, API, RuntimeConfig, MemberlistKV}, + Flusher: []string{Store, API}, + Querier: []string{Distributor, Store, Ring, API, StoreQueryable}, StoreQueryable: []string{Store}, - QueryFrontend: []string{API, Overrides, DeleteRequestsStore}, - TableManager: []string{API}, - Ruler: []string{Distributor, Store, StoreQueryable}, - Configs: []string{API}, - AlertManager: []string{API}, - Compactor: []string{API}, - StoreGateway: []string{API}, - DataPurger: []string{Store, DeleteRequestsStore, API}, - All: []string{QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway} + QueryFrontend: []string{API, Overrides, DeleteRequestsStore}, + TableManager: []string{API}, + Ruler: []string{Distributor, Store, StoreQueryable}, + Configs: []string{API}, + AlertManager: []string{API}, + Compactor: []string{API}, + StoreGateway: []string{API}, + DataPurger: []string{Store, DeleteRequestsStore, API}, + All: []string{QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway}, } for mod, targets := range deps { - mm.AddDependency(m, targets...) + mm.AddDependency(mod, targets...) } return mm diff --git a/pkg/cortex/status.go b/pkg/cortex/status.go index dec9a0f1e9b..b7bfe9617ba 100644 --- a/pkg/cortex/status.go +++ b/pkg/cortex/status.go @@ -10,7 +10,7 @@ func (t *Cortex) servicesHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/plain") // TODO: this could be extended to also print sub-services, if given service has any - for mod, s := range t.serviceMap { + for mod, s := range t.ServiceMap { if s != nil { fmt.Fprintf(w, "%v => %v\n", mod, s.State()) } diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index 46e18199e28..cf66f8e15b8 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -34,8 +34,7 @@ func NewManager() *Manager { // RegisterModule registers a new module with ModuleManager func (m *Manager) RegisterModule(name string, initFn service) { m.modules[name] = module{ - initFn: initFn, - options: options, + initFn: initFn, } return } @@ -43,7 +42,7 @@ func (m *Manager) RegisterModule(name string, initFn service) { // AddDependency adds a dependency from name(source) to dependsOn(targets) func (m *Manager) AddDependency(name string, dependsOn ...string) error { if mod, ok := m.modules[name]; ok { - for dep := range dependsOn { + for _, dep := range dependsOn { if _, ok := m.modules[dep]; ok { mod.deps = append(mod.deps, dep) } else { @@ -58,14 +57,14 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { // InitModuleServices starts the target module func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) { - servicesMap := map[ModuleName]services.Service{} + servicesMap := map[string]services.Service{} // initialize all of our dependencies first deps := m.orderedDeps(target) deps = append(deps, target) // lastly, initialize the requested module for ix, n := range deps { - mod := modules[n] + mod := m.modules[n] var serv services.Service diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index e45f2657455..55c7e3badf0 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -12,29 +12,29 @@ func mockInitFunc() (services.Service, error) { return nil, nil } func TestDependencies(t *testing.T) { var testModules = map[string]module{ "serviceA": { - initService: mockInitFunc, + initFn: mockInitFunc, }, "serviceB": { - deps: []string{"serviceA"}, - initService: mockInitFunc, + initFn: mockInitFunc, }, "serviceC": { - deps: []string{"serviceB"}, - initService: mockInitFunc, + initFn: mockInitFunc, }, } - mm := &moduleManager{} + mm := &Manager{} for name, mod := range testModules { - mm.RegisterModule(name, mod.deps, mod.initService, nil) + mm.RegisterModule(name, mod.initFn) } - svcs, err := mm.StartModule("serviceC") + mm.AddDependency("serviceB", "serviceA") + mm.AddDependency("serviceC", "serviceB") + svcs, err := mm.InitModuleServices("serviceC") assert.NotNil(t, svcs) assert.NoError(t, err) - invDeps := mm.findInverseDependencies("serviceB") + invDeps := mm.findInverseDependencies("serviceB", []string{"serviceA", "serviceC"}) assert.Len(t, invDeps, 1) assert.Equal(t, invDeps[0], "serviceC") } From b009ee7056614c6b2b437b44878a1e774e2208e5 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 6 May 2020 15:12:28 +0530 Subject: [PATCH 03/11] Fix test Signed-off-by: Annanay --- pkg/util/modules/modules_test.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index 55c7e3badf0..e0ea36123e4 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -5,6 +5,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/services" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func mockInitFunc() (services.Service, error) { return nil, nil } @@ -24,17 +25,19 @@ func TestDependencies(t *testing.T) { }, } - mm := &Manager{} + mm := NewManager() for name, mod := range testModules { mm.RegisterModule(name, mod.initFn) } - mm.AddDependency("serviceB", "serviceA") - mm.AddDependency("serviceC", "serviceB") + assert.NoError(t, mm.AddDependency("serviceB", "serviceA")) + assert.NoError(t, mm.AddDependency("serviceC", "serviceB")) + assert.Equal(t, mm.modules["serviceB"].deps, []string{"serviceA"}) + + invDeps := mm.findInverseDependencies("serviceA", []string{"serviceB", "serviceC"}) + require.Len(t, invDeps, 1) + assert.Equal(t, invDeps[0], "serviceB") + svcs, err := mm.InitModuleServices("serviceC") assert.NotNil(t, svcs) assert.NoError(t, err) - - invDeps := mm.findInverseDependencies("serviceB", []string{"serviceA", "serviceC"}) - assert.Len(t, invDeps, 1) - assert.Equal(t, invDeps[0], "serviceC") } From 07fb79430ccc44ed1a080d3b2c24ab75e4e86ef4 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 8 May 2020 13:24:41 +0530 Subject: [PATCH 04/11] Address review comments Signed-off-by: Annanay --- pkg/cortex/cortex.go | 7 +++---- pkg/cortex/modules.go | 8 ++++++-- pkg/util/modules/modules.go | 28 ++++++++++++---------------- 3 files changed, 21 insertions(+), 22 deletions(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index e424f036464..a8d520f147b 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -232,11 +232,10 @@ func New(cfg Config) (*Cortex, error) { cortex := &Cortex{ Cfg: cfg, } - cortex.setupAuthMiddleware() - - mm := cortex.createModuleManager() - cortex.ModuleManager = mm + if err := cortex.setupModuleManager(); err != nil { + return nil, err + } return cortex, nil } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f6e7141c0af..f5b1d31d6b3 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -485,7 +485,7 @@ func (t *Cortex) initDataPurger() (services.Service, error) { return t.DataPurger, nil } -func (t *Cortex) createModuleManager() *modules.Manager { +func (t *Cortex) setupModuleManager() error { mm := modules.NewManager() // Register all modules here. @@ -536,8 +536,12 @@ func (t *Cortex) createModuleManager() *modules.Manager { All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway}, } for mod, targets := range deps { - mm.AddDependency(mod, targets...) + if err := mm.AddDependency(mod, targets...); err != nil { + return err + } } + t.ModuleManager = mm + return mm } diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index cf66f8e15b8..f72f8c6228b 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -7,15 +7,13 @@ import ( "github.com/pkg/errors" ) -type service func() (services.Service, error) - // module is the basic building block of the application type module struct { // dependencies of this module deps []string // initFn for this module (can return nil) - initFn service + initFn func() (services.Service, error) } // Manager is a component that initialises modules of the application @@ -31,31 +29,29 @@ func NewManager() *Manager { } } -// RegisterModule registers a new module with ModuleManager -func (m *Manager) RegisterModule(name string, initFn service) { +// RegisterModule registers a new module with name and init function +// name must be unique to avoid overwriting modules +// if initFn is nil, the module will not initialise +func (m *Manager) RegisterModule(name string, initFn func() (services.Service, error)) { m.modules[name] = module{ initFn: initFn, } - return } // AddDependency adds a dependency from name(source) to dependsOn(targets) +// An error is returned if the source module name is not found func (m *Manager) AddDependency(name string, dependsOn ...string) error { if mod, ok := m.modules[name]; ok { - for _, dep := range dependsOn { - if _, ok := m.modules[dep]; ok { - mod.deps = append(mod.deps, dep) - } else { - return fmt.Errorf("no such module: %s", dep) - } - } + mod.deps = append(mod.deps, dependsOn...) } else { return fmt.Errorf("no such module: %s", name) } return nil } -// InitModuleServices starts the target module +// InitModuleServices initalises the target module by initalising all its dependencies +// in the right order. Modules are wrapped in such a way that they start after their +// dependencies have been started and stop before their dependencies are stopped. func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) { servicesMap := map[string]services.Service{} @@ -76,8 +72,8 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service invDeps := m.findInverseDependencies(n, deps[ix+1:]) if s == nil { - if invDeps != nil { - return nil, fmt.Errorf("module %s returned nil service but has other modules dependent on it", n) + if len(invDeps) > 0 { + return nil, fmt.Errorf("module %s returned nil service but has other modules dependent on it: %v", n, invDeps) } } else { // We pass servicesMap, which isn't yet complete. By the time service starts, From d34c570b898b9cc9c1369eedebf3c5e971d2f223 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 8 May 2020 13:27:12 +0530 Subject: [PATCH 05/11] Cleanup cortex_test.go Signed-off-by: Annanay --- pkg/cortex/cortex_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index d8aaec8fc13..d704bb80255 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -64,7 +64,4 @@ func TestCortex(t *testing.T) { require.NotNil(t, c.ServiceMap[Ingester]) require.NotNil(t, c.ServiceMap[Ring]) require.NotNil(t, c.ServiceMap[Distributor]) - - // check that findInverseDependencie for Ring -- querier and distributor depend on Ring, so should be returned. - // require.ElementsMatch(t, []ModuleName{Distributor, Querier}, findInverseDependencies(Ring, modules[cfg.Target].deps)) } From cbf40a8dae9efbfe3a2c36ca7536b74e709c9757 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 8 May 2020 13:44:19 +0530 Subject: [PATCH 06/11] Fix module test Signed-off-by: Annanay --- pkg/cortex/modules.go | 2 +- pkg/util/modules/modules.go | 6 +++--- pkg/util/modules/modules_test.go | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f5b1d31d6b3..a02df5f9100 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -543,5 +543,5 @@ func (t *Cortex) setupModuleManager() error { t.ModuleManager = mm - return mm + return nil } diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index f72f8c6228b..d9340477e7d 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -19,13 +19,13 @@ type module struct { // Manager is a component that initialises modules of the application // in the right order of dependencies. type Manager struct { - modules map[string]module + modules map[string]*module } // NewManager creates a new Manager func NewManager() *Manager { return &Manager{ - modules: make(map[string]module), + modules: make(map[string]*module), } } @@ -33,7 +33,7 @@ func NewManager() *Manager { // name must be unique to avoid overwriting modules // if initFn is nil, the module will not initialise func (m *Manager) RegisterModule(name string, initFn func() (services.Service, error)) { - m.modules[name] = module{ + m.modules[name] = &module{ initFn: initFn, } } diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index e0ea36123e4..a4d3062c026 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -1,6 +1,7 @@ package modules import ( + "fmt" "testing" "github.com/cortexproject/cortex/pkg/util/services" @@ -38,6 +39,6 @@ func TestDependencies(t *testing.T) { assert.Equal(t, invDeps[0], "serviceB") svcs, err := mm.InitModuleServices("serviceC") - assert.NotNil(t, svcs) - assert.NoError(t, err) + assert.Nil(t, svcs) + assert.Error(t, err, fmt.Errorf("module serviceA returned nil service but has other modules dependent on it: [serviceB]")) } From acabcee3327898f8aec6a70bc326d984b8e4fdef Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 11 May 2020 12:49:25 +0530 Subject: [PATCH 07/11] Checkpoint Signed-off-by: Annanay --- pkg/cortex/cortex_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index d704bb80255..d484675f9ae 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -52,16 +52,19 @@ func TestCortex(t *testing.T) { c, err := New(cfg) require.NoError(t, err) - require.NotNil(t, c.ServiceMap) - for m, s := range c.ServiceMap { + serviceMap, err := c.ModuleManager.InitModuleServices(c.Cfg.Target) + require.NoError(t, err) + require.NotNil(t, serviceMap) + + for m, s := range serviceMap { // make sure each service is still New require.Equal(t, services.New, s.State(), "module: %s", m) } // check random modules that we expect to be configured when using Target=All - require.NotNil(t, c.ServiceMap[Server]) - require.NotNil(t, c.ServiceMap[Ingester]) - require.NotNil(t, c.ServiceMap[Ring]) - require.NotNil(t, c.ServiceMap[Distributor]) + require.NotNil(t, serviceMap[Server]) + require.NotNil(t, serviceMap[Ingester]) + require.NotNil(t, serviceMap[Ring]) + require.NotNil(t, serviceMap[Distributor]) } From 0afad1f71d3ceb341a81f054aba4c94aa2f66245 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 11 May 2020 19:05:34 +0530 Subject: [PATCH 08/11] Ignore inverse dependencies if module returns nil service Signed-off-by: Annanay --- pkg/util/modules/modules.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index d9340477e7d..33fb2477770 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -70,16 +70,9 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) } - invDeps := m.findInverseDependencies(n, deps[ix+1:]) - if s == nil { - if len(invDeps) > 0 { - return nil, fmt.Errorf("module %s returned nil service but has other modules dependent on it: %v", n, invDeps) - } - } else { - // We pass servicesMap, which isn't yet complete. By the time service starts, - // it will be fully built, so there is no need for extra synchronization. - serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, invDeps) - } + // We pass servicesMap, which isn't yet complete. By the time service starts, + // it will be fully built, so there is no need for extra synchronization. + serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) } if serv != nil { From 90e606c3a33e42004985cda976e6f950e33d5ad7 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 11 May 2020 20:19:44 +0530 Subject: [PATCH 09/11] Lint, fix tests Signed-off-by: Annanay --- pkg/util/modules/modules.go | 5 +++-- pkg/util/modules/modules_test.go | 8 ++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index 33fb2477770..d34e53ae82e 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -3,8 +3,9 @@ package modules import ( "fmt" - "github.com/cortexproject/cortex/pkg/util/services" "github.com/pkg/errors" + + "github.com/cortexproject/cortex/pkg/util/services" ) // module is the basic building block of the application @@ -49,7 +50,7 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { return nil } -// InitModuleServices initalises the target module by initalising all its dependencies +// InitModuleServices initialises the target module by initialising all its dependencies // in the right order. Modules are wrapped in such a way that they start after their // dependencies have been started and stop before their dependencies are stopped. func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) { diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index a4d3062c026..0dec6d09a8b 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -1,12 +1,12 @@ package modules import ( - "fmt" "testing" - "github.com/cortexproject/cortex/pkg/util/services" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util/services" ) func mockInitFunc() (services.Service, error) { return nil, nil } @@ -39,6 +39,6 @@ func TestDependencies(t *testing.T) { assert.Equal(t, invDeps[0], "serviceB") svcs, err := mm.InitModuleServices("serviceC") - assert.Nil(t, svcs) - assert.Error(t, err, fmt.Errorf("module serviceA returned nil service but has other modules dependent on it: [serviceB]")) + assert.NotNil(t, svcs) + assert.NoError(t, err) } From 8df928ea82f2b6343dc61251726d0ba543e5dc31 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 11 May 2020 23:23:01 +0530 Subject: [PATCH 10/11] Fix tests for unknown module name Signed-off-by: Annanay --- cmd/cortex/main_test.go | 22 ---------------------- pkg/util/modules/modules.go | 3 +++ pkg/util/modules/modules_test.go | 5 +++++ 3 files changed, 8 insertions(+), 22 deletions(-) diff --git a/cmd/cortex/main_test.go b/cmd/cortex/main_test.go index 75ac3726044..923c43a25a1 100644 --- a/cmd/cortex/main_test.go +++ b/cmd/cortex/main_test.go @@ -26,17 +26,6 @@ func TestFlagParsing(t *testing.T) { stderrMessage: configFileOption, }, - // check that config file is used - "config with unknown target": { - yaml: "target: unknown", - stderrMessage: "unrecognised module name: unknown", - }, - - "argument with unknown target": { - arguments: []string{"-target=unknown"}, - stderrMessage: "unrecognised module name: unknown", - }, - "unknown flag": { arguments: []string{"-unknown.flag"}, stderrMessage: "-unknown.flag", @@ -48,12 +37,6 @@ func TestFlagParsing(t *testing.T) { stdoutMessage: "target: ingester", }, - "config with wrong argument override": { - yaml: "target: ingester", - arguments: []string{"-target=unknown"}, - stderrMessage: "unrecognised module name: unknown", - }, - "default values": { stdoutMessage: "target: all\n", }, @@ -63,11 +46,6 @@ func TestFlagParsing(t *testing.T) { stdoutMessage: "target: ingester\n", }, - "config without expand-env": { - yaml: "target: $TARGET", - stderrMessage: "Error parsing config file: unrecognised module name: $TARGET\n", - }, - "config with expand-env": { arguments: []string{"-config.expand-env"}, yaml: "target: $TARGET", diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index d34e53ae82e..6cc6f9911ff 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -54,6 +54,9 @@ func (m *Manager) AddDependency(name string, dependsOn ...string) error { // in the right order. Modules are wrapped in such a way that they start after their // dependencies have been started and stop before their dependencies are stopped. func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) { + if _, ok := m.modules[target]; !ok { + return nil, fmt.Errorf("unrecognised module name: %s", target) + } servicesMap := map[string]services.Service{} // initialize all of our dependencies first diff --git a/pkg/util/modules/modules_test.go b/pkg/util/modules/modules_test.go index 0dec6d09a8b..3812779c2d2 100644 --- a/pkg/util/modules/modules_test.go +++ b/pkg/util/modules/modules_test.go @@ -1,6 +1,7 @@ package modules import ( + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -41,4 +42,8 @@ func TestDependencies(t *testing.T) { svcs, err := mm.InitModuleServices("serviceC") assert.NotNil(t, svcs) assert.NoError(t, err) + + svcs, err = mm.InitModuleServices("service_unknown") + assert.Nil(t, svcs) + assert.Error(t, err, fmt.Errorf("unrecognised module name: service_unknown")) } From 0f8f5130430efd1234434ff3e1ceb7d9f2a901a4 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 14 May 2020 15:50:48 +0530 Subject: [PATCH 11/11] Create module service wrapper only if service is not nil Signed-off-by: Annanay --- pkg/util/modules/modules.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/util/modules/modules.go b/pkg/util/modules/modules.go index 6cc6f9911ff..b021227c42f 100644 --- a/pkg/util/modules/modules.go +++ b/pkg/util/modules/modules.go @@ -74,9 +74,11 @@ func (m *Manager) InitModuleServices(target string) (map[string]services.Service return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n)) } - // We pass servicesMap, which isn't yet complete. By the time service starts, - // it will be fully built, so there is no need for extra synchronization. - serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) + if s != nil { + // We pass servicesMap, which isn't yet complete. By the time service starts, + // it will be fully built, so there is no need for extra synchronization. + serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:])) + } } if serv != nil {