diff --git a/CHANGELOG.md b/CHANGELOG.md index cc58a87b22f..84467d7c45c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,11 +12,13 @@ * [CHANGE] Use relative links from /ring page to make it work when used behind reverse proxy. #1896 * [CHANGE] Deprecated `-distributor.limiter-reload-period` flag. #1766 * [CHANGE] Ingesters now write only normalised tokens to the ring, although they can still read denormalised tokens used by other ingesters. `-ingester.normalise-tokens` is now deprecated, and ignored. If you want to switch back to using denormalised tokens, you need to downgrade to Cortex 0.4.0. Previous versions don't handle claiming tokens from normalised ingesters correctly. #1809 +* [CHANGE] Overrides mechanism has been renamed to "runtime config", and is now separate from limits. Runtime config is simply a file that is reloaded by Cortex every couple of seconds. Limits and now also multi KV use this mechanism.
New arguments were introduced: `-runtime-config.file` (defaults to empty) and `-runtime-config.reload-period` (defaults to 10 seconds), which replace previously used `-limits.per-user-override-config` and `-limits.per-user-override-period` options. Old options are still used if `-runtime-config.file` is not specified. This change is also reflected in YAML configuration, where old `limits.per_tenant_override_config` and `limits.per_tenant_override_period` fields are replaced with `runtime_config.file` and `runtime_config.period` respectively. #1749 * [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726 * [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766 * [FEATURE] Added support for Microsoft Azure blob storage to be used for storing chunk data. #1913 * [FEATURE] Added readiness probe endpoint`/ready` to queriers. #1934 * [FEATURE] EXPERIMENTAL: Added `/series` API endpoint support with TSDB blocks storage. #1830 +* [FEATURE] Added "multi" KV store that can interact with two other KV stores, primary one for all reads and writes, and secondary one, which only receives writes. Primary/secondary store can be modified in runtime via runtime-config mechanism (previously "overrides"). #1749 * [ENHANCEMENT] Added `password` and `enable_tls` options to redis cache configuration. Enables usage of Microsoft Azure Cache for Redis service. * [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861 * [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 3defae19b22..5cff13cd705 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -122,7 +122,7 @@ The KVStore client is used by both the Ring and HA Tracker. - `{ring,distributor.ha-tracker}.prefix` The prefix for the keys in the store. Should end with a /. For example with a prefix of foo/, the key bar would be stored under foo/bar. - `{ring,distributor.ha-tracker}.store` - Backend storage to use for the ring (consul, etcd, inmemory). + Backend storage to use for the ring (consul, etcd, inmemory, memberlist, multi). #### Consul @@ -182,6 +182,35 @@ Flags for configuring KV store based on memberlist library. This feature is expe Timeout for writing 'packet' data. - `memberlist.transport-debug` Log debug transport messages. Note: global log.level must be at debug level as well. + +#### Multi KV + +This is a special key-value implementation that uses two different KV stores (eg. consul, etcd or memberlist). One of them is always marked as primary, and all reads and writes go to primary store. Other one, secondary, is only used for writes. The idea is that operator can use multi KV store to migrate from primary to secondary store in runtime. + +For example, migration from Consul to Etcd would look like this: + +- Set `ring.store` to use `multi` store. Set `-multi.primary=consul` and `-multi.secondary=etcd`. All consul and etcd settings must still be specified. +- Start all Cortex microservices. They will still use Consul as primary KV, but they will also write share ring via etcd. +- Operator can now use "runtime config" mechanism to switch primary store to etcd. +- After all Cortex microservices have picked up new primary store, and everything looks correct, operator can now shut down Consul, and modify Cortex configuration to use `-ring.store=etcd` only. +- At this point, Consul can be shut down. + +Multi KV has following parameters: + +- `multi.primary` - name of primary KV store. Same values as in `ring.store` are supported, except `multi`. +- `multi.secondary` - name of secondary KV store. +- `multi.mirror-enabled` - enable mirroring of values to secondary store, defaults to true +- `multi.mirror-timeout` - wait max this time to write to secondary store to finish. Default to 2 seconds. Errors writing to secondary store are not reported to caller, but are logged and also reported via `cortex_multikv_mirror_write_errors_total` metric. + +Multi KV also reacts on changes done via runtime configuration. It uses this section: + +```yaml +multi_kv_config: + mirror-enabled: false + primary: memberlist +``` + +Note that runtime configuration values take precedence over command line options. ### HA Tracker @@ -276,11 +305,13 @@ It also talks to a KVStore and has it's own copies of the same flags used by the Where you don't want to cache every chunk written by ingesters, but you do want to take advantage of chunk write deduplication, this option will make ingesters write a placeholder to the cache for each chunk. Make sure you configure ingesters with a different cache to queriers, which need the whole value. -## Ingester, Distributor & Querier limits. +## Runtime Configuration file -Cortex implements various limits on the requests it can process, in order to prevent a single tenant overwhelming the cluster. There are various default global limits which apply to all tenants which can be set on the command line. These limits can also be overridden on a per-tenant basis, using a configuration file. Specify the filename for the override configuration file using the `-limits.per-user-override-config=` flag. The override file will be re-read every 10 seconds by default - this can also be controlled using the `-limits.per-user-override-period=10s` flag. +Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=` is not specified. -The override file should be in YAML format and contain a single `overrides` field, which itself is a map of tenant ID (same values as passed in the `X-Scope-OrgID` header) to the various limits. An example `overrides.yml` could look like: +At the moment, two components use runtime configuration: limits and multi KV store. + +Example runtime configuration file: ```yaml overrides: @@ -292,11 +323,33 @@ overrides: max_samples_per_query: 1000000 max_series_per_metric: 100000 max_series_per_query: 100000 + +multi_kv_config: + mirror-enabled: false + primary: memberlist ``` When running Cortex on Kubernetes, store this file in a config map and mount it in each services' containers. When changing the values there is no need to restart the services, unless otherwise specified. -Valid fields are (with their corresponding flags for default values): +## Ingester, Distributor & Querier limits. + +Cortex implements various limits on the requests it can process, in order to prevent a single tenant overwhelming the cluster. There are various default global limits which apply to all tenants which can be set on the command line. These limits can also be overridden on a per-tenant basis by using `overrides` field of runtime configuration file. + +The `overrides` field is a map of tenant ID (same values as passed in the `X-Scope-OrgID` header) to the various limits. An example could look like: + +```yaml +overrides: + tenant1: + ingestion_rate: 10000 + max_series_per_metric: 100000 + max_series_per_query: 100000 + tenant2: + max_samples_per_query: 1000000 + max_series_per_metric: 100000 + max_series_per_query: 100000 +``` + +Valid per-tenant limits are (with their corresponding flags for default values): - `ingestion_rate_strategy` / `-distributor.ingestion-rate-limit-strategy` - `ingestion_rate` / `-distributor.ingestion-rate-limit` diff --git a/go.mod b/go.mod index 5359f73f094..ee8afc3bd45 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( github.com/thanos-io/thanos v0.8.1-0.20200102143048-a37ac093a67a github.com/tinylib/msgp v0.0.0-20161221055906-38a6f61a768d // indirect github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect + github.com/uber-go/atomic v1.4.0 github.com/uber/jaeger-client-go v2.20.1+incompatible github.com/weaveworks/billing-client v0.0.0-20171006123215-be0d55e547b1 github.com/weaveworks/common v0.0.0-20190822150010-afb9996716e4 diff --git a/go.sum b/go.sum index f9d70c269dd..c834e13df9d 100644 --- a/go.sum +++ b/go.sum @@ -750,6 +750,8 @@ github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o= +github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g= github.com/uber/jaeger-client-go v2.20.1+incompatible h1:HgqpYBng0n7tLJIlyT4kPCIv5XgCsF+kai1NnnrJzEU= github.com/uber/jaeger-client-go v2.20.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 80632775a26..14f4fae8d83 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -88,7 +88,7 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto var limits validation.Limits flagext.DefaultValues(&limits) limits.MaxQueryLength = 30 * 24 * time.Hour - overrides, err := validation.NewOverrides(limits) + overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) store := NewCompositeStore() diff --git a/pkg/chunk/storage/caching_fixtures.go b/pkg/chunk/storage/caching_fixtures.go index 47e90fe44e5..ece80430986 100644 --- a/pkg/chunk/storage/caching_fixtures.go +++ b/pkg/chunk/storage/caching_fixtures.go @@ -41,5 +41,5 @@ func defaultLimits() (*validation.Overrides, error) { var defaults validation.Limits flagext.DefaultValues(&defaults) defaults.CardinalityLimit = 5 - return validation.NewOverrides(defaults) + return validation.NewOverrides(defaults, nil) } diff --git a/pkg/chunk/storage/factory_test.go b/pkg/chunk/storage/factory_test.go index 0b6737d94d0..2d680250cba 100644 --- a/pkg/chunk/storage/factory_test.go +++ b/pkg/chunk/storage/factory_test.go @@ -30,7 +30,7 @@ func TestFactoryStop(t *testing.T) { }, } - limits, err := validation.NewOverrides(defaults) + limits, err := validation.NewOverrides(defaults, nil) require.NoError(t, err) store, err := NewStore(cfg, storeConfig, schemaConfig, limits) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 1fe46fa5271..87d6f321b1d 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -31,6 +31,7 @@ import ( "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -75,10 +76,11 @@ type Config struct { Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. TSDB tsdb.Config `yaml:"tsdb"` - Ruler ruler.Config `yaml:"ruler,omitempty"` - ConfigDB db.Config `yaml:"configdb,omitempty"` - ConfigStore config_client.Config `yaml:"config_store,omitempty"` - Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager,omitempty"` + Ruler ruler.Config `yaml:"ruler,omitempty"` + ConfigDB db.Config `yaml:"configdb,omitempty"` + ConfigStore config_client.Config `yaml:"config_store,omitempty"` + Alertmanager alertmanager.MultitenantAlertmanagerConfig `yaml:"alertmanager,omitempty"` + RuntimeConfig runtimeconfig.ManagerConfig `yaml:"runtime_config,omitempty"` } // RegisterFlags registers flag. @@ -112,6 +114,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.ConfigDB.RegisterFlags(f) c.ConfigStore.RegisterFlagsWithPrefix("alertmanager.", f) c.Alertmanager.RegisterFlags(f) + c.RuntimeConfig.RegisterFlags(f) // These don't seem to have a home. flag.IntVar(&chunk_util.QueryParallelism, "querier.query-parallelism", 100, "Max subqueries run in parallel per higher-level query.") @@ -146,16 +149,17 @@ type Cortex struct { target moduleName httpAuthMiddleware middleware.Interface - server *server.Server - ring *ring.Ring - overrides *validation.Overrides - distributor *distributor.Distributor - ingester *ingester.Ingester - store chunk.Store - worker frontend.Worker - frontend *frontend.Frontend - tableManager *chunk.TableManager - cache cache.Cache + server *server.Server + ring *ring.Ring + overrides *validation.Overrides + distributor *distributor.Distributor + ingester *ingester.Ingester + store chunk.Store + worker frontend.Worker + frontend *frontend.Frontend + tableManager *chunk.TableManager + cache cache.Cache + runtimeConfig *runtimeconfig.Manager ruler *ruler.Ruler configAPI *api.API diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 36288b4d373..ccf2735838e 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -32,6 +32,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -40,6 +41,7 @@ type moduleName int // The various modules that make up Cortex. const ( Ring moduleName = iota + RuntimeConfig Overrides Server Distributor @@ -58,6 +60,8 @@ func (m moduleName) String() string { switch m { case Ring: return "ring" + case RuntimeConfig: + return "runtime-config" case Overrides: return "overrides" case Server: @@ -152,6 +156,7 @@ func (t *Cortex) stopServer() (err error) { } func (t *Cortex) initRing(cfg *Config) (err error) { + cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.ring, err = ring.New(cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey) if err != nil { return @@ -161,16 +166,30 @@ func (t *Cortex) initRing(cfg *Config) (err error) { return } -func (t *Cortex) initOverrides(cfg *Config) (err error) { - t.overrides, err = validation.NewOverrides(cfg.LimitsConfig) +func (t *Cortex) initRuntimeConfig(cfg *Config) (err error) { + if cfg.RuntimeConfig.LoadPath == "" { + cfg.RuntimeConfig.LoadPath = cfg.LimitsConfig.PerTenantOverrideConfig + cfg.RuntimeConfig.ReloadPeriod = cfg.LimitsConfig.PerTenantOverridePeriod + } + cfg.RuntimeConfig.Loader = loadRuntimeConfig + + // make sure to set default limits before we start loading configuration into memory + validation.SetDefaultLimitsForYAMLUnmarshalling(cfg.LimitsConfig) + + t.runtimeConfig, err = runtimeconfig.NewRuntimeConfigManager(cfg.RuntimeConfig) return err } -func (t *Cortex) stopOverrides() error { - t.overrides.Stop() +func (t *Cortex) stopRuntimeConfig() (err error) { + t.runtimeConfig.Stop() return nil } +func (t *Cortex) initOverrides(cfg *Config) (err error) { + t.overrides, err = validation.NewOverrides(cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig)) + return err +} + func (t *Cortex) initDistributor(cfg *Config) (err error) { cfg.Distributor.DistributorRing.ListenPort = cfg.Server.GRPCListenPort @@ -257,6 +276,7 @@ func (t *Cortex) stopQuerier() error { } func (t *Cortex) initIngester(cfg *Config) (err error) { + cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) cfg.Ingester.LifecyclerConfig.ListenPort = &cfg.Server.GRPCListenPort cfg.Ingester.TSDBEnabled = cfg.Storage.Engine == storage.StorageEngineTSDB cfg.Ingester.TSDBConfig = cfg.TSDB @@ -446,14 +466,19 @@ var modules = map[moduleName]module{ stop: (*Cortex).stopServer, }, + RuntimeConfig: { + init: (*Cortex).initRuntimeConfig, + stop: (*Cortex).stopRuntimeConfig, + }, + Ring: { - deps: []moduleName{Server}, + deps: []moduleName{Server, RuntimeConfig}, init: (*Cortex).initRing, }, Overrides: { + deps: []moduleName{RuntimeConfig}, init: (*Cortex).initOverrides, - stop: (*Cortex).stopOverrides, }, Distributor: { @@ -469,7 +494,7 @@ var modules = map[moduleName]module{ }, Ingester: { - deps: []moduleName{Overrides, Store, Server}, + deps: []moduleName{Overrides, Store, Server, RuntimeConfig}, init: (*Cortex).initIngester, stop: (*Cortex).stopIngester, }, diff --git a/pkg/cortex/runtime_config.go b/pkg/cortex/runtime_config.go new file mode 100644 index 00000000000..8f914839720 --- /dev/null +++ b/pkg/cortex/runtime_config.go @@ -0,0 +1,72 @@ +package cortex + +import ( + "os" + + "gopkg.in/yaml.v2" + + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/util/runtimeconfig" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// runtimeConfigValues are values that can be reloaded from configuration file while Cortex is running. +// Reloading is done by runtime_config.Manager, which also keeps the currently loaded config. +// These values are then pushed to the components that are interested in them. +type runtimeConfigValues struct { + TenantLimits map[string]*validation.Limits `yaml:"overrides"` + + Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` +} + +func loadRuntimeConfig(filename string) (interface{}, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + + var overrides = &runtimeConfigValues{} + + decoder := yaml.NewDecoder(f) + decoder.SetStrict(true) + if err := decoder.Decode(&overrides); err != nil { + return nil, err + } + + return overrides, nil +} + +func tenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.TenantLimits { + return func(userID string) *validation.Limits { + cfg, ok := c.GetConfig().(*runtimeConfigValues) + if !ok || cfg == nil { + return nil + } + + return cfg.TenantLimits[userID] + } +} + +func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-chan kv.MultiRuntimeConfig { + // returns function that can be used in MultiConfig.ConfigProvider + return func() <-chan kv.MultiRuntimeConfig { + outCh := make(chan kv.MultiRuntimeConfig, 1) + + // push initial config to the channel + val := manager.GetConfig() + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + outCh <- cfg.Multi + } + + ch := manager.CreateListenerChannel(1) + go func() { + for val := range ch { + if cfg, ok := val.(*runtimeConfigValues); ok && cfg != nil { + outCh <- cfg.Multi + } + } + }() + + return outCh + } +} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index dc40c603817..0de0548c878 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -756,7 +756,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur cfg.DistributorRing.KVStore.Mock = kvStore cfg.DistributorRing.InstanceAddr = "127.0.0.1" - overrides, err := validation.NewOverrides(*limits) + overrides, err := validation.NewOverrides(*limits, nil) require.NoError(t, err) d, err := New(cfg, clientConfig, overrides, ingestersRing, true) diff --git a/pkg/distributor/ingestion_rate_strategy_test.go b/pkg/distributor/ingestion_rate_strategy_test.go index 6004279a305..14bb7445061 100644 --- a/pkg/distributor/ingestion_rate_strategy_test.go +++ b/pkg/distributor/ingestion_rate_strategy_test.go @@ -61,7 +61,7 @@ func TestIngestionRateStrategy(t *testing.T) { var strategy limiter.RateLimiterStrategy // Init limits overrides - overrides, err := validation.NewOverrides(testData.limits) + overrides, err := validation.NewOverrides(testData.limits, nil) require.NoError(t, err) // Instance the strategy diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index f9574405bd8..93f8897aebb 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -41,7 +41,7 @@ func newTestStore(t require.TestingT, cfg Config, clientConfig client.Config, li store := &testStore{ chunks: map[string][]chunk.Chunk{}, } - overrides, err := validation.NewOverrides(limits) + overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) ing, err := New(cfg, clientConfig, overrides, store, nil) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index b62ee10d580..1c04a5811fa 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -715,7 +715,7 @@ func newIngesterMockWithTSDBStorage(ingesterCfg Config, registerer prometheus.Re clientCfg := defaultClientTestConfig() limits := defaultLimitsTestConfig() - overrides, err := validation.NewOverrides(limits) + overrides, err := validation.NewOverrides(limits, nil) if err != nil { return nil, nil, err } diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 7595d5d3450..1b0e97954b1 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -93,7 +93,7 @@ func TestIngesterRestart(t *testing.T) { } func TestIngesterTransfer(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig()) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) // Start the first ingester, and get it into ACTIVE state. @@ -158,7 +158,7 @@ func TestIngesterTransfer(t *testing.T) { } func TestIngesterBadTransfer(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig()) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) // Start ingester in PENDING. @@ -410,7 +410,7 @@ func TestV2IngesterTransfer(t *testing.T) { // We run the same under different scenarios for name, scenario := range scenarios { t.Run(name, func(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig()) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) dir1, err := ioutil.TempDir("", "tsdb") diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go index 6400f3bdfc7..5af41e9b340 100644 --- a/pkg/ingester/limiter_test.go +++ b/pkg/ingester/limiter_test.go @@ -90,7 +90,7 @@ func TestSeriesLimit_maxSeriesPerMetric(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{ MaxLocalSeriesPerMetric: testData.maxLocalSeriesPerMetric, MaxGlobalSeriesPerMetric: testData.maxGlobalSeriesPerMetric, - }) + }, nil) require.NoError(t, err) limiter := NewSeriesLimiter(limits, ring, testData.ringReplicationFactor, testData.shardByAllLabels) @@ -180,7 +180,7 @@ func TestSeriesLimit_maxSeriesPerUser(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{ MaxLocalSeriesPerUser: testData.maxLocalSeriesPerUser, MaxGlobalSeriesPerUser: testData.maxGlobalSeriesPerUser, - }) + }, nil) require.NoError(t, err) limiter := NewSeriesLimiter(limits, ring, testData.ringReplicationFactor, testData.shardByAllLabels) @@ -242,7 +242,7 @@ func TestSeriesLimiter_AssertMaxSeriesPerMetric(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{ MaxLocalSeriesPerMetric: testData.maxLocalSeriesPerMetric, MaxGlobalSeriesPerMetric: testData.maxGlobalSeriesPerMetric, - }) + }, nil) require.NoError(t, err) limiter := NewSeriesLimiter(limits, ring, testData.ringReplicationFactor, testData.shardByAllLabels) @@ -304,7 +304,7 @@ func TestSeriesLimiter_AssertMaxSeriesPerUser(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{ MaxLocalSeriesPerUser: testData.maxLocalSeriesPerUser, MaxGlobalSeriesPerUser: testData.maxGlobalSeriesPerUser, - }) + }, nil) require.NoError(t, err) limiter := NewSeriesLimiter(limits, ring, testData.ringReplicationFactor, testData.shardByAllLabels) diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index cb18146d235..b1d1ff65448 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -136,7 +136,7 @@ func TestFrontendCancel(t *testing.T) { func defaultOverrides(t *testing.T) *validation.Overrides { var limits validation.Limits flagext.DefaultValues(&limits) - overrides, err := validation.NewOverrides(limits) + overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) return overrides } diff --git a/pkg/ring/kv/client.go b/pkg/ring/kv/client.go index ba3317365cc..b31bbc99832 100644 --- a/pkg/ring/kv/client.go +++ b/pkg/ring/kv/client.go @@ -20,16 +20,24 @@ import ( var inmemoryStoreInit sync.Once var inmemoryStore Client -// Config is config for a KVStore currently used by ring and HA tracker, -// where store can be consul or inmemory. -type Config struct { - Store string `yaml:"store,omitempty"` +// StoreConfig is a configuration used for building single store client, either +// Consul, Etcd, Memberlist or MultiClient. It was extracted from Config to keep +// single-client config separate from final client-config (with all the wrappers) +type StoreConfig struct { Consul consul.Config `yaml:"consul,omitempty"` Etcd etcd.Config `yaml:"etcd,omitempty"` Memberlist memberlist.Config `yaml:"memberlist,omitempty"` - Prefix string `yaml:"prefix,omitempty"` + Multi MultiConfig `yaml:"multi,omitempty"` +} + +// Config is config for a KVStore currently used by ring and HA tracker, +// where store can be consul or inmemory. +type Config struct { + Store string `yaml:"store,omitempty"` + Prefix string `yaml:"prefix,omitempty"` + StoreConfig `yaml:",inline"` - Mock Client + Mock Client `yaml:"-"` } // RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet. @@ -44,12 +52,14 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { // be easier to have everything under ring, so ring.consul. cfg.Consul.RegisterFlags(f, prefix) cfg.Etcd.RegisterFlagsWithPrefix(f, prefix) + cfg.Multi.RegisterFlagsWithPrefix(f, prefix) cfg.Memberlist.RegisterFlags(f, prefix) + if prefix == "" { prefix = "ring." } f.StringVar(&cfg.Prefix, prefix+"prefix", "collectors/", "The prefix for the keys in the store. Should end with a /.") - f.StringVar(&cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, etcd, inmemory, memberlist [experimental]).") + f.StringVar(&cfg.Store, prefix+"store", "consul", "Backend storage to use for the ring (consul, etcd, inmemory, multi, memberlist [experimental]).") } // Client is a high-level client for key-value stores (such as Etcd and @@ -86,10 +96,14 @@ func NewClient(cfg Config, codec codec.Codec) (Client, error) { return cfg.Mock, nil } + return createClient(cfg.Store, cfg.Prefix, cfg.StoreConfig, codec) +} + +func createClient(name string, prefix string, cfg StoreConfig, codec codec.Codec) (Client, error) { var client Client var err error - switch cfg.Store { + switch name { case "consul": client, err = consul.NewClient(cfg.Consul, codec) @@ -108,17 +122,49 @@ func NewClient(cfg Config, codec codec.Codec) (Client, error) { cfg.Memberlist.MetricsRegisterer = prometheus.DefaultRegisterer client, err = memberlist.NewMemberlistClient(cfg.Memberlist, codec) + case "multi": + client, err = buildMultiClient(cfg, codec) + default: - return nil, fmt.Errorf("invalid KV store type: %s", cfg.Store) + return nil, fmt.Errorf("invalid KV store type: %s", name) } if err != nil { return nil, err } - if cfg.Prefix != "" { - client = PrefixClient(client, cfg.Prefix) + if prefix != "" { + client = PrefixClient(client, prefix) } return metrics{client}, nil } + +func buildMultiClient(cfg StoreConfig, codec codec.Codec) (Client, error) { + if cfg.Multi.Primary == "" || cfg.Multi.Secondary == "" { + return nil, fmt.Errorf("primary or secondary store not set") + } + if cfg.Multi.Primary == "multi" || cfg.Multi.Secondary == "multi" { + return nil, fmt.Errorf("primary and secondary stores cannot be multi-stores") + } + if cfg.Multi.Primary == cfg.Multi.Secondary { + return nil, fmt.Errorf("primary and secondary stores must be different") + } + + primary, err := createClient(cfg.Multi.Primary, "", cfg, codec) + if err != nil { + return nil, err + } + + secondary, err := createClient(cfg.Multi.Secondary, "", cfg, codec) + if err != nil { + return nil, err + } + + clients := []kvclient{ + {client: primary, name: cfg.Multi.Primary}, + {client: secondary, name: cfg.Multi.Secondary}, + } + + return NewMultiClient(cfg.Multi, clients), nil +} diff --git a/pkg/ring/kv/client_test.go b/pkg/ring/kv/client_test.go new file mode 100644 index 00000000000..9800ea3c1c0 --- /dev/null +++ b/pkg/ring/kv/client_test.go @@ -0,0 +1,31 @@ +package kv + +import ( + "testing" + + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func TestParseConfig(t *testing.T) { + conf := ` +store: consul +consul: + host: "consul:8500" + consistentreads: true +prefix: "test/" +multi: + primary: consul + secondary: etcd +` + + cfg := Config{} + + err := yaml.Unmarshal([]byte(conf), &cfg) + require.NoError(t, err) + require.Equal(t, "consul", cfg.Store) + require.Equal(t, "test/", cfg.Prefix) + require.Equal(t, "consul:8500", cfg.Consul.Host) + require.Equal(t, "consul", cfg.Multi.Primary) + require.Equal(t, "etcd", cfg.Multi.Secondary) +} diff --git a/pkg/ring/kv/multi.go b/pkg/ring/kv/multi.go new file mode 100644 index 00000000000..8831540cd0f --- /dev/null +++ b/pkg/ring/kv/multi.go @@ -0,0 +1,361 @@ +package kv + +import ( + "context" + "flag" + "fmt" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log" + "github.com/prometheus/client_golang/prometheus" + + "github.com/go-kit/kit/log/level" + "github.com/uber-go/atomic" +) + +var ( + primaryStoreGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_multikv_primary_store", + Help: "Selected primary KV store", + }, []string{"store"}) + + mirrorEnabledGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_multikv_mirror_enabled", + Help: "Is mirroring to secondary store enabled", + }) + + mirrorWritesCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_multikv_mirror_writes_total", + Help: "Number of mirror-writes to secondary store", + }) + + mirrorFailuresCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_multikv_mirror_write_errors_total", + Help: "Number of failures to mirror-write to secondary store", + }) +) + +func init() { + prometheus.MustRegister(primaryStoreGauge, mirrorEnabledGauge, mirrorWritesCounter, mirrorFailuresCounter) +} + +// MultiConfig is a configuration for MultiClient. +type MultiConfig struct { + Primary string `yaml:"primary"` + Secondary string `yaml:"secondary"` + + MirrorEnabled bool `yaml:"mirror_enabled"` + MirrorTimeout time.Duration `yaml:"mirror_timeout"` + + // ConfigProvider returns channel with MultiRuntimeConfig updates. + ConfigProvider func() <-chan MultiRuntimeConfig +} + +// RegisterFlagsWithPrefix registers flags with prefix. +func (cfg *MultiConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) { + f.StringVar(&cfg.Primary, prefix+"multi.primary", "", "Primary backend storage used by multi-client.") + f.StringVar(&cfg.Secondary, prefix+"multi.secondary", "", "Secondary backend storage used by multi-client.") + f.BoolVar(&cfg.MirrorEnabled, prefix+"multi.mirror-enabled", false, "Mirror writes to secondary store.") + f.DurationVar(&cfg.MirrorTimeout, prefix+"multi.mirror-timeout", 2*time.Second, "Timeout for storing value to secondary store.") +} + +// MultiRuntimeConfig has values that can change in runtime (via overrides) +type MultiRuntimeConfig struct { + // Primary store used by MultiClient. Can be updated in runtime to switch to a different store (eg. consul -> etcd, + // or to gossip). Doing this allows nice migration between stores. Empty values are ignored. + PrimaryStore string `yaml:"primary"` + + // Mirroring enabled or not. Nil = no change. + Mirroring *bool `yaml:"mirror_enabled"` +} + +type kvclient struct { + client Client + name string +} + +type clientInProgress struct { + client int + cancel context.CancelFunc +} + +// MultiClient implements kv.Client by forwarding all API calls to primary client. +// Writes performed via CAS method are also (optionally) forwarded to secondary clients. +type MultiClient struct { + // Available KV clients + clients []kvclient + + mirrorTimeout time.Duration + mirroringEnabled *atomic.Bool + + // logger with "multikv" component + logger log.Logger + + // The primary client used for interaction. + primaryID *atomic.Int32 + + cancel context.CancelFunc + + inProgressMu sync.Mutex + // Cancel functions for ongoing operations. key is a value from inProgressCnt. + // What we really need is a []context.CancelFunc, but functions cannot be compared against each other using ==, + // so we use this map instead. + inProgress map[int]clientInProgress + inProgressCnt int +} + +// NewMultiClient creates new MultiClient with given KV Clients. +// First client in the slice is the primary client. +func NewMultiClient(cfg MultiConfig, clients []kvclient) *MultiClient { + c := &MultiClient{ + clients: clients, + primaryID: atomic.NewInt32(0), + inProgress: map[int]clientInProgress{}, + + mirrorTimeout: cfg.MirrorTimeout, + mirroringEnabled: atomic.NewBool(cfg.MirrorEnabled), + + logger: log.With(util.Logger, "component", "multikv"), + } + + ctx, cancelFn := context.WithCancel(context.Background()) + c.cancel = cancelFn + + if cfg.ConfigProvider != nil { + go c.watchConfigChannel(ctx, cfg.ConfigProvider()) + } + + c.updatePrimaryStoreGauge() + c.updateMirrorEnabledGauge() + return c +} + +func (m *MultiClient) watchConfigChannel(ctx context.Context, configChannel <-chan MultiRuntimeConfig) { + for { + select { + case cfg, ok := <-configChannel: + if !ok { + return + } + + if cfg.Mirroring != nil { + enabled := *cfg.Mirroring + old := m.mirroringEnabled.Swap(enabled) + if old != enabled { + level.Info(m.logger).Log("msg", "toggled mirroring", "enabled", enabled) + } + m.updateMirrorEnabledGauge() + } + + if cfg.PrimaryStore != "" { + switched, err := m.setNewPrimaryClient(cfg.PrimaryStore) + if switched { + level.Info(m.logger).Log("msg", "switched primary KV store", "primary", cfg.PrimaryStore) + } + if err != nil { + level.Error(m.logger).Log("msg", "failed to switch primary KV store", "primary", cfg.PrimaryStore, "err", err) + } + } + + case <-ctx.Done(): + return + } + } +} + +func (m *MultiClient) getPrimaryClient() (int, kvclient) { + v := m.primaryID.Load() + return int(v), m.clients[v] +} + +// returns true, if primary client has changed +func (m *MultiClient) setNewPrimaryClient(store string) (bool, error) { + newPrimaryIx := -1 + for ix, c := range m.clients { + if c.name == store { + newPrimaryIx = ix + break + } + } + + if newPrimaryIx < 0 { + return false, fmt.Errorf("KV store not found") + } + + prev := int(m.primaryID.Swap(int32(newPrimaryIx))) + if prev == newPrimaryIx { + return false, nil + } + + defer m.updatePrimaryStoreGauge() // do as the last thing, after releasing the lock + + // switching to new primary... cancel clients using previous one + m.inProgressMu.Lock() + defer m.inProgressMu.Unlock() + + for _, inp := range m.inProgress { + if inp.client == prev { + inp.cancel() + } + } + return true, nil +} + +func (m *MultiClient) updatePrimaryStoreGauge() { + _, pkv := m.getPrimaryClient() + + for _, kv := range m.clients { + value := float64(0) + if pkv == kv { + value = 1 + } + + primaryStoreGauge.WithLabelValues(kv.name).Set(value) + } +} + +func (m *MultiClient) updateMirrorEnabledGauge() { + if m.mirroringEnabled.Load() { + mirrorEnabledGauge.Set(1) + } else { + mirrorEnabledGauge.Set(0) + } +} + +func (m *MultiClient) registerCancelFn(clientID int, fn context.CancelFunc) int { + m.inProgressMu.Lock() + defer m.inProgressMu.Unlock() + + m.inProgressCnt++ + id := m.inProgressCnt + m.inProgress[id] = clientInProgress{client: clientID, cancel: fn} + return id +} + +func (m *MultiClient) unregisterCancelFn(id int) { + m.inProgressMu.Lock() + defer m.inProgressMu.Unlock() + + delete(m.inProgress, id) +} + +// Runs supplied fn with current primary client. If primary client changes, fn is restarted. +// When fn finishes (with or without error), this method returns given error value. +func (m *MultiClient) runWithPrimaryClient(origCtx context.Context, fn func(newCtx context.Context, primary kvclient) error) error { + cancelFn := context.CancelFunc(nil) + cancelFnID := 0 + + cleanup := func() { + if cancelFn != nil { + cancelFn() + } + if cancelFnID > 0 { + m.unregisterCancelFn(cancelFnID) + } + } + + defer cleanup() + + // This only loops if switchover to a new primary backend happens while calling 'fn', which is very rare. + for { + cleanup() + pid, kv := m.getPrimaryClient() + + var cancelCtx context.Context + cancelCtx, cancelFn = context.WithCancel(origCtx) + cancelFnID = m.registerCancelFn(pid, cancelFn) + + err := fn(cancelCtx, kv) + + if err == nil { + return nil + } + + if cancelCtx.Err() == context.Canceled && origCtx.Err() == nil { + // our context was cancelled, but outer context is not done yet. retry + continue + } + + return err + } +} + +// Get is a part of kv.Client interface. +func (m *MultiClient) Get(ctx context.Context, key string) (interface{}, error) { + _, kv := m.getPrimaryClient() + val, err := kv.client.Get(ctx, key) + return val, err +} + +// CAS is a part of kv.Client interface. +func (m *MultiClient) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error { + _, kv := m.getPrimaryClient() + + updatedValue := interface{}(nil) + err := kv.client.CAS(ctx, key, func(in interface{}) (interface{}, bool, error) { + out, retry, err := f(in) + updatedValue = out + return out, retry, err + }) + + if err == nil && updatedValue != nil && m.mirroringEnabled.Load() { + m.writeToSecondary(ctx, kv, key, updatedValue) + } + + return err +} + +// WatchKey is a part of kv.Client interface. +func (m *MultiClient) WatchKey(ctx context.Context, key string, f func(interface{}) bool) { + _ = m.runWithPrimaryClient(ctx, func(newCtx context.Context, primary kvclient) error { + primary.client.WatchKey(newCtx, key, f) + return newCtx.Err() + }) +} + +// WatchPrefix is a part of kv.Client interface. +func (m *MultiClient) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) { + _ = m.runWithPrimaryClient(ctx, func(newCtx context.Context, primary kvclient) error { + primary.client.WatchPrefix(newCtx, prefix, f) + return newCtx.Err() + }) +} + +func (m *MultiClient) writeToSecondary(ctx context.Context, primary kvclient, key string, newValue interface{}) { + if m.mirrorTimeout > 0 { + var cfn context.CancelFunc + ctx, cfn = context.WithTimeout(ctx, m.mirrorTimeout) + defer cfn() + } + + // let's propagate new value to all remaining clients + for _, kvc := range m.clients { + if kvc == primary { + continue + } + + mirrorWritesCounter.Inc() + err := kvc.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) { + // try once + return newValue, false, nil + }) + + if err != nil { + mirrorFailuresCounter.Inc() + level.Warn(m.logger).Log("msg", "failed to update value in secondary store", "key", key, "err", err, "primary", primary.name, "secondary", kvc.name) + } else { + level.Debug(m.logger).Log("msg", "stored updated value to secondary store", "key", key, "primary", primary.name, "secondary", kvc.name) + } + } +} + +// Stop the multiClient and all configured clients. +func (m *MultiClient) Stop() { + m.cancel() + + for _, kv := range m.clients { + kv.client.Stop() + } +} diff --git a/pkg/ring/kv/multi_test.go b/pkg/ring/kv/multi_test.go new file mode 100644 index 00000000000..9385ed5ff8e --- /dev/null +++ b/pkg/ring/kv/multi_test.go @@ -0,0 +1,32 @@ +package kv + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v2" +) + +func boolPtr(b bool) *bool { + return &b +} + +func TestMultiRuntimeConfigWithVariousEnabledValues(t *testing.T) { + testcases := map[string]struct { + yaml string + expected *bool + }{ + "nil": {"primary: test", nil}, + "true": {"primary: test\nmirror_enabled: true", boolPtr(true)}, + "false": {"mirror_enabled: false", boolPtr(false)}, + } + + for name, tc := range testcases { + t.Run(name, func(t *testing.T) { + c := MultiRuntimeConfig{} + err := yaml.Unmarshal([]byte(tc.yaml), &c) + assert.NoError(t, err, tc.yaml) + assert.Equal(t, tc.expected, c.Mirroring, tc.yaml) + }) + } +} diff --git a/pkg/util/runtimeconfig/manager.go b/pkg/util/runtimeconfig/manager.go new file mode 100644 index 00000000000..a6b21857ea9 --- /dev/null +++ b/pkg/util/runtimeconfig/manager.go @@ -0,0 +1,172 @@ +package runtimeconfig + +import ( + "flag" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var overridesReloadSuccess = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_overrides_last_reload_successful", + Help: "Whether the last config reload attempt was successful.", +}) + +// Loader loads the configuration from file. +type Loader func(filename string) (interface{}, error) + +// ManagerConfig holds the config for an Manager instance. +// It holds config related to loading per-tenant config. +type ManagerConfig struct { + ReloadPeriod time.Duration `yaml:"period"` + LoadPath string `yaml:"file"` + Loader Loader +} + +// RegisterFlags registers flags. +func (mc *ManagerConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&mc.LoadPath, "runtime-config.file", "", "File with the configuration that can be updated in runtime.") + f.DurationVar(&mc.ReloadPeriod, "runtime-config.reload-period", 10*time.Second, "How often to check runtime config file.") +} + +// Manager periodically reloads the configuration from a file, and keeps this +// configuration available for clients. +type Manager struct { + cfg ManagerConfig + quit chan struct{} + + listenersMtx sync.Mutex + listeners []chan interface{} + + configMtx sync.RWMutex + config interface{} +} + +// NewRuntimeConfigManager creates an instance of Manager and starts reload config loop based on config +func NewRuntimeConfigManager(cfg ManagerConfig) (*Manager, error) { + mgr := Manager{ + cfg: cfg, + quit: make(chan struct{}), + } + + if cfg.LoadPath != "" { + if err := mgr.loadConfig(); err != nil { + // Log but don't stop on error - we don't want to halt all ingesters because of a typo + level.Error(util.Logger).Log("msg", "failed to load config", "err", err) + } + go mgr.loop() + } else { + level.Info(util.Logger).Log("msg", "runtime config disabled: file not specified") + } + + return &mgr, nil +} + +// CreateListenerChannel creates new channel that can be used to receive new config values. +// If there is no receiver waiting for value when config manager tries to send the update, +// or channel buffer is full, update is discarded. +// +// When config manager is stopped, it closes all channels to notify receivers that they will +// not receive any more updates. +func (om *Manager) CreateListenerChannel(buffer int) <-chan interface{} { + ch := make(chan interface{}, buffer) + + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + om.listeners = append(om.listeners, ch) + return ch +} + +// CloseListenerChannel removes given channel from list of channels to send notifications to and closes channel. +func (om *Manager) CloseListenerChannel(listener <-chan interface{}) { + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + for ix, ch := range om.listeners { + if ch == listener { + om.listeners = append(om.listeners[:ix], om.listeners[ix+1:]...) + close(ch) + break + } + } +} + +func (om *Manager) loop() { + ticker := time.NewTicker(om.cfg.ReloadPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + err := om.loadConfig() + if err != nil { + // Log but don't stop on error - we don't want to halt all ingesters because of a typo + level.Error(util.Logger).Log("msg", "failed to load config", "err", err) + } + case <-om.quit: + return + } + } +} + +// loadConfig loads configuration using the loader function, and if successful, +// stores it as current configuration and notifies listeners. +func (om *Manager) loadConfig() error { + cfg, err := om.cfg.Loader(om.cfg.LoadPath) + if err != nil { + overridesReloadSuccess.Set(0) + return err + } + overridesReloadSuccess.Set(1) + + om.setConfig(cfg) + om.callListeners(cfg) + + return nil +} + +func (om *Manager) setConfig(config interface{}) { + om.configMtx.Lock() + defer om.configMtx.Unlock() + om.config = config +} + +func (om *Manager) callListeners(newValue interface{}) { + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + for _, ch := range om.listeners { + select { + case ch <- newValue: + // ok + default: + // nobody is listening or buffer full. + } + } +} + +// Stop stops the Manager +func (om *Manager) Stop() { + close(om.quit) + + om.listenersMtx.Lock() + defer om.listenersMtx.Unlock() + + for _, ch := range om.listeners { + close(ch) + } + om.listeners = nil +} + +// GetConfig returns last loaded config value, possibly nil. +func (om *Manager) GetConfig() interface{} { + om.configMtx.RLock() + defer om.configMtx.RUnlock() + + return om.config +} diff --git a/pkg/util/runtimeconfig/manager_test.go b/pkg/util/runtimeconfig/manager_test.go new file mode 100644 index 00000000000..c4f590d6d50 --- /dev/null +++ b/pkg/util/runtimeconfig/manager_test.go @@ -0,0 +1,222 @@ +package runtimeconfig + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/uber-go/atomic" + "gopkg.in/yaml.v2" +) + +type TestLimits struct { + Limit1 int `json:"limit1"` + Limit2 int `json:"limit2"` +} + +// WARNING: THIS GLOBAL VARIABLE COULD LEAD TO UNEXPECTED BEHAVIOUR WHEN RUNNING MULTIPLE DIFFERENT TESTS +var defaultTestLimits *TestLimits + +type testOverrides struct { + Overrides map[string]*TestLimits `yaml:"overrides"` +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (l *TestLimits) UnmarshalYAML(unmarshal func(interface{}) error) error { + if defaultTestLimits != nil { + *l = *defaultTestLimits + } + type plain TestLimits + return unmarshal((*plain)(l)) +} + +func testLoadOverrides(filename string) (interface{}, error) { + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + var overrides = &testOverrides{} + + decoder := yaml.NewDecoder(f) + decoder.SetStrict(true) + if err := decoder.Decode(&overrides); err != nil { + return nil, err + } + + return overrides, nil +} + +func TestNewOverridesManager(t *testing.T) { + tempFile, err := ioutil.TempFile("", "test-validation") + require.NoError(t, err) + + defer func() { + // Clean up + require.NoError(t, tempFile.Close()) + require.NoError(t, os.Remove(tempFile.Name())) + }() + + _, err = tempFile.WriteString(`overrides: + user1: + limit2: 150`) + require.NoError(t, err) + + defaultTestLimits = &TestLimits{Limit1: 100} + + // testing NewRuntimeConfigManager with overrides reload config set + overridesManagerConfig := ManagerConfig{ + ReloadPeriod: time.Second, + LoadPath: tempFile.Name(), + Loader: testLoadOverrides, + } + + overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig) + require.NoError(t, err) + + // Cleaning up + overridesManager.Stop() + + // Make sure test limits were loaded. + require.NotNil(t, overridesManager.GetConfig()) +} + +func TestOverridesManager_ListenerWithDefaultLimits(t *testing.T) { + tempFile, err := ioutil.TempFile("", "test-validation") + require.NoError(t, err) + require.NoError(t, tempFile.Close()) + + defer func() { + // Clean up + require.NoError(t, os.Remove(tempFile.Name())) + }() + + err = ioutil.WriteFile(tempFile.Name(), []byte(`overrides: + user1: + limit2: 150`), 0600) + require.NoError(t, err) + + defaultTestLimits = &TestLimits{Limit1: 100} + + // testing NewRuntimeConfigManager with overrides reload config set + overridesManagerConfig := ManagerConfig{ + ReloadPeriod: time.Second, + LoadPath: tempFile.Name(), + Loader: testLoadOverrides, + } + + overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig) + require.NoError(t, err) + + // need to use buffer, otherwise loadConfig will throw away update + ch := overridesManager.CreateListenerChannel(1) + + // rewrite file + err = ioutil.WriteFile(tempFile.Name(), []byte(`overrides: + user2: + limit2: 200`), 0600) + require.NoError(t, err) + + // reload + err = overridesManager.loadConfig() + require.NoError(t, err) + + var newValue interface{} + select { + case newValue = <-ch: + // ok + case <-time.After(time.Second): + t.Fatal("listener was not called") + } + + to := newValue.(*testOverrides) + require.Equal(t, 200, to.Overrides["user2"].Limit2) // new overrides + require.Equal(t, 100, to.Overrides["user2"].Limit1) // from defaults + + // Cleaning up + overridesManager.Stop() + + // Make sure test limits were loaded. + require.NotNil(t, overridesManager.GetConfig()) +} + +func TestOverridesManager_ListenerChannel(t *testing.T) { + var config = atomic.NewInt32(555) + + // testing NewRuntimeConfigManager with overrides reload config set + overridesManagerConfig := ManagerConfig{ + ReloadPeriod: 5 * time.Second, + LoadPath: "ignored", + Loader: func(filename string) (i interface{}, err error) { + val := int(config.Load()) + return val, nil + }, + } + + overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig) + require.NoError(t, err) + + // need to use buffer, otherwise loadConfig will throw away update + ch := overridesManager.CreateListenerChannel(1) + + err = overridesManager.loadConfig() + require.NoError(t, err) + + select { + case newValue := <-ch: + require.Equal(t, 555, newValue) + case <-time.After(time.Second): + t.Fatal("listener was not called") + } + + config.Store(1111) + err = overridesManager.loadConfig() + require.NoError(t, err) + + select { + case newValue := <-ch: + require.Equal(t, 1111, newValue) + case <-time.After(time.Second): + t.Fatal("listener was not called") + } + + overridesManager.CloseListenerChannel(ch) + select { + case _, ok := <-ch: + require.False(t, ok) + case <-time.After(time.Second): + t.Fatal("channel not closed") + } +} + +func TestOverridesManager_StopClosesListenerChannels(t *testing.T) { + var config = atomic.NewInt32(555) + + // testing NewRuntimeConfigManager with overrides reload config set + overridesManagerConfig := ManagerConfig{ + ReloadPeriod: 5 * time.Second, + LoadPath: "ignored", + Loader: func(filename string) (i interface{}, err error) { + val := int(config.Load()) + return val, nil + }, + } + + overridesManager, err := NewRuntimeConfigManager(overridesManagerConfig) + require.NoError(t, err) + + // need to use buffer, otherwise loadConfig will throw away update + ch := overridesManager.CreateListenerChannel(0) + + overridesManager.Stop() + + select { + case _, ok := <-ch: + require.False(t, ok) + case <-time.After(time.Second): + t.Fatal("channel not closed") + } +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 2eb60d9a163..bae57b70660 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -3,11 +3,8 @@ package validation import ( "errors" "flag" - "os" "time" - "gopkg.in/yaml.v2" - "github.com/cortexproject/cortex/pkg/util/flagext" ) @@ -55,7 +52,7 @@ type Limits struct { MaxQueryParallelism int `yaml:"max_query_parallelism"` CardinalityLimit int `yaml:"cardinality_limit"` - // Config for overrides, convenient if it goes here. + // Config for overrides, convenient if it goes here. [Deprecated in favor of RuntimeConfig flag in cortex.Config] PerTenantOverrideConfig string `yaml:"per_tenant_override_config"` PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period"` } @@ -90,8 +87,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of queries will be scheduled in parallel by the frontend.") f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.") - f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.") - f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides.") + f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides. [deprecated, use -runtime-config.file instead]") + f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides. [deprecated, use -runtime-config.reload-period instead]") } // Validate the limits config and returns an error if the validation @@ -126,197 +123,169 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { // find a nicer way I'm afraid. var defaultLimits *Limits +// SetDefaultLimitsForYAMLUnmarshalling sets global default limits, used when loading +// Limits from YAML files. This is used to ensure per-tenant limits are defaulted to +// those values. +func SetDefaultLimitsForYAMLUnmarshalling(defaults Limits) { + defaultLimits = &defaults +} + +// TenantLimits is a function that returns limits for given tenant, or +// nil, if there are no tenant-specific limits. +type TenantLimits func(userID string) *Limits + // Overrides periodically fetch a set of per-user overrides, and provides convenience // functions for fetching the correct value. type Overrides struct { - overridesManager *OverridesManager + defaultLimits *Limits + tenantLimits TenantLimits } // NewOverrides makes a new Overrides. -// We store the supplied limits in a global variable to ensure per-tenant limits -// are defaulted to those values. As such, the last call to NewOverrides will -// become the new global defaults. -func NewOverrides(defaults Limits) (*Overrides, error) { - defaultLimits = &defaults - overridesManagerConfig := OverridesManagerConfig{ - OverridesReloadPeriod: defaults.PerTenantOverridePeriod, - OverridesLoadPath: defaults.PerTenantOverrideConfig, - OverridesLoader: loadOverrides, - Defaults: &defaults, - } - - overridesManager, err := NewOverridesManager(overridesManagerConfig) - if err != nil { - return nil, err - } - +func NewOverrides(defaults Limits, tenantLimits TenantLimits) (*Overrides, error) { return &Overrides{ - overridesManager: overridesManager, + tenantLimits: tenantLimits, + defaultLimits: &defaults, }, nil } -// Stop background reloading of overrides. -func (o *Overrides) Stop() { - o.overridesManager.Stop() -} - // IngestionRate returns the limit on ingester rate (samples per second). func (o *Overrides) IngestionRate(userID string) float64 { - return o.overridesManager.GetLimits(userID).(*Limits).IngestionRate + return o.getOverridesForUser(userID).IngestionRate } // IngestionRateStrategy returns whether the ingestion rate limit should be individually applied // to each distributor instance (local) or evenly shared across the cluster (global). func (o *Overrides) IngestionRateStrategy() string { // The ingestion rate strategy can't be overridden on a per-tenant basis - defaultLimits := o.overridesManager.cfg.Defaults - return defaultLimits.(*Limits).IngestionRateStrategy + return o.defaultLimits.IngestionRateStrategy } // IngestionBurstSize returns the burst size for ingestion rate. func (o *Overrides) IngestionBurstSize(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).IngestionBurstSize + return o.getOverridesForUser(userID).IngestionBurstSize } // AcceptHASamples returns whether the distributor should track and accept samples from HA replicas for this user. func (o *Overrides) AcceptHASamples(userID string) bool { - return o.overridesManager.GetLimits(userID).(*Limits).AcceptHASamples + return o.getOverridesForUser(userID).AcceptHASamples } // HAClusterLabel returns the cluster label to look for when deciding whether to accept a sample from a Prometheus HA replica. func (o *Overrides) HAClusterLabel(userID string) string { - return o.overridesManager.GetLimits(userID).(*Limits).HAClusterLabel + return o.getOverridesForUser(userID).HAClusterLabel } // HAReplicaLabel returns the replica label to look for when deciding whether to accept a sample from a Prometheus HA replica. func (o *Overrides) HAReplicaLabel(userID string) string { - return o.overridesManager.GetLimits(userID).(*Limits).HAReplicaLabel + return o.getOverridesForUser(userID).HAReplicaLabel } // DropLabels returns the list of labels to be dropped when ingesting HA samples for the user. func (o *Overrides) DropLabels(userID string) flagext.StringSlice { - return o.overridesManager.GetLimits(userID).(*Limits).DropLabels + return o.getOverridesForUser(userID).DropLabels } // MaxLabelNameLength returns maximum length a label name can be. func (o *Overrides) MaxLabelNameLength(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNameLength + return o.getOverridesForUser(userID).MaxLabelNameLength } // MaxLabelValueLength returns maximum length a label value can be. This also is // the maximum length of a metric name. func (o *Overrides) MaxLabelValueLength(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelValueLength + return o.getOverridesForUser(userID).MaxLabelValueLength } // MaxLabelNamesPerSeries returns maximum number of label/value pairs timeseries. func (o *Overrides) MaxLabelNamesPerSeries(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxLabelNamesPerSeries + return o.getOverridesForUser(userID).MaxLabelNamesPerSeries } // RejectOldSamples returns true when we should reject samples older than certain // age. func (o *Overrides) RejectOldSamples(userID string) bool { - return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamples + return o.getOverridesForUser(userID).RejectOldSamples } // RejectOldSamplesMaxAge returns the age at which samples should be rejected. func (o *Overrides) RejectOldSamplesMaxAge(userID string) time.Duration { - return o.overridesManager.GetLimits(userID).(*Limits).RejectOldSamplesMaxAge + return o.getOverridesForUser(userID).RejectOldSamplesMaxAge } // CreationGracePeriod is misnamed, and actually returns how far into the future // we should accept samples. func (o *Overrides) CreationGracePeriod(userID string) time.Duration { - return o.overridesManager.GetLimits(userID).(*Limits).CreationGracePeriod + return o.getOverridesForUser(userID).CreationGracePeriod } // MaxSeriesPerQuery returns the maximum number of series a query is allowed to hit. func (o *Overrides) MaxSeriesPerQuery(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxSeriesPerQuery + return o.getOverridesForUser(userID).MaxSeriesPerQuery } // MaxSamplesPerQuery returns the maximum number of samples in a query (from the ingester). func (o *Overrides) MaxSamplesPerQuery(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxSamplesPerQuery + return o.getOverridesForUser(userID).MaxSamplesPerQuery } // MaxLocalSeriesPerUser returns the maximum number of series a user is allowed to store in a single ingester. func (o *Overrides) MaxLocalSeriesPerUser(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxLocalSeriesPerUser + return o.getOverridesForUser(userID).MaxLocalSeriesPerUser } // MaxLocalSeriesPerMetric returns the maximum number of series allowed per metric in a single ingester. func (o *Overrides) MaxLocalSeriesPerMetric(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxLocalSeriesPerMetric + return o.getOverridesForUser(userID).MaxLocalSeriesPerMetric } // MaxGlobalSeriesPerUser returns the maximum number of series a user is allowed to store across the cluster. func (o *Overrides) MaxGlobalSeriesPerUser(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxGlobalSeriesPerUser + return o.getOverridesForUser(userID).MaxGlobalSeriesPerUser } // MaxGlobalSeriesPerMetric returns the maximum number of series allowed per metric across the cluster. func (o *Overrides) MaxGlobalSeriesPerMetric(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxGlobalSeriesPerMetric + return o.getOverridesForUser(userID).MaxGlobalSeriesPerMetric } // MaxChunksPerQuery returns the maximum number of chunks allowed per query. func (o *Overrides) MaxChunksPerQuery(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxChunksPerQuery + return o.getOverridesForUser(userID).MaxChunksPerQuery } // MaxQueryLength returns the limit of the length (in time) of a query. func (o *Overrides) MaxQueryLength(userID string) time.Duration { - return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryLength + return o.getOverridesForUser(userID).MaxQueryLength } // MaxQueryParallelism returns the limit to the number of sub-queries the // frontend will process in parallel. func (o *Overrides) MaxQueryParallelism(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MaxQueryParallelism + return o.getOverridesForUser(userID).MaxQueryParallelism } // EnforceMetricName whether to enforce the presence of a metric name. func (o *Overrides) EnforceMetricName(userID string) bool { - return o.overridesManager.GetLimits(userID).(*Limits).EnforceMetricName + return o.getOverridesForUser(userID).EnforceMetricName } // CardinalityLimit returns the maximum number of timeseries allowed in a query. func (o *Overrides) CardinalityLimit(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).CardinalityLimit + return o.getOverridesForUser(userID).CardinalityLimit } // MinChunkLength returns the minimum size of chunk that will be saved by ingesters func (o *Overrides) MinChunkLength(userID string) int { - return o.overridesManager.GetLimits(userID).(*Limits).MinChunkLength -} - -// Loads overrides and returns the limits as an interface to store them in OverridesManager. -// We need to implement it here since OverridesManager must store type Limits in an interface but -// it doesn't know its definition to initialize it. -// We could have used yamlv3.Node for this but there is no way to enforce strict decoding due to a bug in it -// TODO: Use yamlv3.Node to move this to OverridesManager after https://github.com/go-yaml/yaml/issues/460 is fixed -func loadOverrides(filename string) (map[string]interface{}, error) { - f, err := os.Open(filename) - if err != nil { - return nil, err - } - - var overrides struct { - Overrides map[string]*Limits `yaml:"overrides"` - } - - decoder := yaml.NewDecoder(f) - decoder.SetStrict(true) - if err := decoder.Decode(&overrides); err != nil { - return nil, err - } + return o.getOverridesForUser(userID).MinChunkLength +} - overridesAsInterface := map[string]interface{}{} - for userID := range overrides.Overrides { - overridesAsInterface[userID] = overrides.Overrides[userID] +func (o *Overrides) getOverridesForUser(userID string) *Limits { + if o.tenantLimits != nil { + l := o.tenantLimits(userID) + if l != nil { + return l + } } - - return overridesAsInterface, nil + return o.defaultLimits } diff --git a/pkg/util/validation/limits_test.go b/pkg/util/validation/limits_test.go index dcb4b15313b..eb4bbdcf9e0 100644 --- a/pkg/util/validation/limits_test.go +++ b/pkg/util/validation/limits_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" ) func TestLimits_Validate(t *testing.T) { @@ -39,3 +41,50 @@ func TestLimits_Validate(t *testing.T) { }) } } + +func TestOverridesManager_GetOverrides(t *testing.T) { + tenantLimits := map[string]*Limits{} + + defaults := Limits{ + MaxLabelNamesPerSeries: 100, + } + ov, err := NewOverrides(defaults, func(userID string) *Limits { + return tenantLimits[userID] + }) + + require.NoError(t, err) + + require.Equal(t, 100, ov.MaxLabelNamesPerSeries("user1")) + require.Equal(t, 0, ov.MaxLabelValueLength("user1")) + + // Update limits for tenant user1. We only update single field, the rest is copied from defaults. + // (That is how limits work when loaded from YAML) + l := Limits{} + l = defaults + l.MaxLabelValueLength = 150 + + tenantLimits["user1"] = &l + + // Checking whether overrides were enforced + require.Equal(t, 100, ov.MaxLabelNamesPerSeries("user1")) + require.Equal(t, 150, ov.MaxLabelValueLength("user1")) + + // Verifying user2 limits are not impacted by overrides + require.Equal(t, 100, ov.MaxLabelNamesPerSeries("user2")) + require.Equal(t, 0, ov.MaxLabelValueLength("user2")) +} + +func TestLimitsLoadingFromYaml(t *testing.T) { + SetDefaultLimitsForYAMLUnmarshalling(Limits{ + MaxLabelNameLength: 100, + }) + + inp := `ingestion_rate: 0.5` + + l := Limits{} + err := yaml.Unmarshal([]byte(inp), &l) + require.NoError(t, err) + + assert.Equal(t, 0.5, l.IngestionRate, "from yaml") + assert.Equal(t, 100, l.MaxLabelNameLength, "from defaults") +} diff --git a/pkg/util/validation/override.go b/pkg/util/validation/override.go deleted file mode 100644 index e6ef2d2a271..00000000000 --- a/pkg/util/validation/override.go +++ /dev/null @@ -1,107 +0,0 @@ -package validation - -import ( - "sync" - "time" - - "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" -) - -var overridesReloadSuccess = promauto.NewGauge(prometheus.GaugeOpts{ - Name: "cortex_overrides_last_reload_successful", - Help: "Whether the last overrides reload attempt was successful.", -}) - -// OverridesLoader loads the overrides -type OverridesLoader func(string) (map[string]interface{}, error) - -// OverridesManagerConfig holds the config for an OverridesManager instance. -// It holds config related to loading per-tentant overrides and the default limits -type OverridesManagerConfig struct { - OverridesReloadPeriod time.Duration - OverridesLoadPath string - OverridesLoader OverridesLoader - Defaults interface{} -} - -// OverridesManager manages default and per user limits i.e overrides. -// It can periodically keep reloading overrides based on config. -type OverridesManager struct { - cfg OverridesManagerConfig - overrides map[string]interface{} - overridesMtx sync.RWMutex - quit chan struct{} -} - -// NewOverridesManager creates an instance of OverridesManager and starts reload overrides loop based on config -func NewOverridesManager(cfg OverridesManagerConfig) (*OverridesManager, error) { - overridesManager := OverridesManager{ - cfg: cfg, - quit: make(chan struct{}), - } - - if cfg.OverridesLoadPath != "" { - if err := overridesManager.loadOverrides(); err != nil { - // Log but don't stop on error - we don't want to halt all ingesters because of a typo - level.Error(util.Logger).Log("msg", "failed to load limit overrides", "err", err) - } - go overridesManager.loop() - } else { - level.Info(util.Logger).Log("msg", "per-tenant overrides disabled") - } - - return &overridesManager, nil -} - -func (om *OverridesManager) loop() { - ticker := time.NewTicker(om.cfg.OverridesReloadPeriod) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - err := om.loadOverrides() - if err != nil { - // Log but don't stop on error - we don't want to halt all ingesters because of a typo - level.Error(util.Logger).Log("msg", "failed to load limit overrides", "err", err) - } - case <-om.quit: - return - } - } -} - -func (om *OverridesManager) loadOverrides() error { - overrides, err := om.cfg.OverridesLoader(om.cfg.OverridesLoadPath) - if err != nil { - overridesReloadSuccess.Set(0) - return err - } - overridesReloadSuccess.Set(1) - - om.overridesMtx.Lock() - defer om.overridesMtx.Unlock() - om.overrides = overrides - return nil -} - -// Stop stops the OverridesManager -func (om *OverridesManager) Stop() { - close(om.quit) -} - -// GetLimits returns Limits for a specific userID if its set otherwise the default Limits -func (om *OverridesManager) GetLimits(userID string) interface{} { - om.overridesMtx.RLock() - defer om.overridesMtx.RUnlock() - - override, ok := om.overrides[userID] - if !ok { - return om.cfg.Defaults - } - - return override -} diff --git a/pkg/util/validation/override_test.go b/pkg/util/validation/override_test.go deleted file mode 100644 index 9792ecefe07..00000000000 --- a/pkg/util/validation/override_test.go +++ /dev/null @@ -1,141 +0,0 @@ -package validation - -import ( - "io/ioutil" - "os" - "testing" - "time" - - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v2" -) - -type TestLimits struct { - Limit1 int `json:"limit1"` - Limit2 int `json:"limit2"` -} - -// WARNING: THIS GLOBAL VARIABLE COULD LEAD TO UNEXPECTED BEHAVIOUR WHEN RUNNING MULTIPLE DIFFERENT TESTS -var defaultTestLimits *TestLimits - -// UnmarshalYAML implements the yaml.Unmarshaler interface. -func (l *TestLimits) UnmarshalYAML(unmarshal func(interface{}) error) error { - if defaultTestLimits != nil { - *l = *defaultTestLimits - } - type plain TestLimits - return unmarshal((*plain)(l)) -} - -func testLoadOverrides(filename string) (map[string]interface{}, error) { - f, err := os.Open(filename) - if err != nil { - return nil, err - } - - var overrides struct { - Overrides map[string]*TestLimits `yaml:"overrides"` - } - - decoder := yaml.NewDecoder(f) - decoder.SetStrict(true) - if err := decoder.Decode(&overrides); err != nil { - return nil, err - } - - overridesAsInterface := map[string]interface{}{} - for k := range overrides.Overrides { - overridesAsInterface[k] = overrides.Overrides[k] - } - - return overridesAsInterface, nil -} - -func TestNewOverridesManager(t *testing.T) { - tempFile, err := ioutil.TempFile("", "test-validation") - require.NoError(t, err) - - defer func() { - // Clean up - require.NoError(t, tempFile.Close()) - require.NoError(t, os.Remove(tempFile.Name())) - }() - - _, err = tempFile.WriteString(`overrides: - user1: - limit2: 150`) - require.NoError(t, err) - - defaultTestLimits = &TestLimits{Limit1: 100} - - // testing NewOverridesManager with overrides reload config set - overridesManagerConfig := OverridesManagerConfig{ - OverridesReloadPeriod: time.Second, - OverridesLoadPath: tempFile.Name(), - OverridesLoader: testLoadOverrides, - Defaults: defaultTestLimits, - } - - var overridesManager *OverridesManager - done := make(chan struct{}) - - go func() { - overridesManager, err = NewOverridesManager(overridesManagerConfig) - close(done) - }() - - select { - case <-time.After(time.Second): - t.Fatal("failed to get a response from NewOverridesManager() before timeout") - case <-done: - } - require.NoError(t, err) - - // Cleaning up - overridesManager.Stop() -} - -func TestOverridesManager_GetLimits(t *testing.T) { - defaultTestLimits = &TestLimits{Limit1: 100} - overridesManagerConfig := OverridesManagerConfig{ - OverridesReloadPeriod: 0, - OverridesLoadPath: "", - OverridesLoader: testLoadOverrides, - Defaults: defaultTestLimits, - } - - overridesManager, err := NewOverridesManager(overridesManagerConfig) - require.NoError(t, err) - - require.Equal(t, 100, overridesManager.GetLimits("user1").(*TestLimits).Limit1) - require.Equal(t, 0, overridesManager.GetLimits("user1").(*TestLimits).Limit2) - - // Setting up perTenantOverrides for user user1 - tempFile, err := ioutil.TempFile("", "test-validation") - require.NoError(t, err) - - defer func() { - // Clean up - require.NoError(t, tempFile.Close()) - require.NoError(t, os.Remove(tempFile.Name())) - }() - - _, err = tempFile.WriteString(`overrides: - user1: - limit2: 150`) - require.NoError(t, err) - - overridesManager.cfg.OverridesLoadPath = tempFile.Name() - require.NoError(t, overridesManager.loadOverrides()) - - // Checking whether overrides were enforced - require.Equal(t, 100, overridesManager.GetLimits("user1").(*TestLimits).Limit1) - require.Equal(t, 150, overridesManager.GetLimits("user1").(*TestLimits).Limit2) - - // Verifying user2 limits are not impacted by overrides - require.Equal(t, 100, overridesManager.GetLimits("user2").(*TestLimits).Limit1) - require.Equal(t, 0, overridesManager.GetLimits("user2").(*TestLimits).Limit2) - - // Cleaning up - overridesManager.Stop() -} diff --git a/vendor/github.com/uber-go/atomic/.codecov.yml b/vendor/github.com/uber-go/atomic/.codecov.yml new file mode 100644 index 00000000000..6d4d1be7b57 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/.codecov.yml @@ -0,0 +1,15 @@ +coverage: + range: 80..100 + round: down + precision: 2 + + status: + project: # measuring the overall project coverage + default: # context, you can create multiple ones with custom titles + enabled: yes # must be yes|true to enable this status + target: 100 # specify the target coverage for each commit status + # option: "auto" (must increase from parent commit or pull request base) + # option: "X%" a static target percentage to hit + if_not_found: success # if parent is not found report status as success, error, or failure + if_ci_failed: error # if ci fails report status as success, error, or failure + diff --git a/vendor/github.com/uber-go/atomic/.gitignore b/vendor/github.com/uber-go/atomic/.gitignore new file mode 100644 index 00000000000..0a4504f1109 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/.gitignore @@ -0,0 +1,11 @@ +.DS_Store +/vendor +/cover +cover.out +lint.log + +# Binaries +*.test + +# Profiling output +*.prof diff --git a/vendor/github.com/uber-go/atomic/.travis.yml b/vendor/github.com/uber-go/atomic/.travis.yml new file mode 100644 index 00000000000..0f3769e5fa6 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/.travis.yml @@ -0,0 +1,27 @@ +sudo: false +language: go +go_import_path: go.uber.org/atomic + +go: + - 1.11.x + - 1.12.x + +matrix: + include: + - go: 1.12.x + env: NO_TEST=yes LINT=yes + +cache: + directories: + - vendor + +install: + - make install_ci + +script: + - test -n "$NO_TEST" || make test_ci + - test -n "$NO_TEST" || scripts/test-ubergo.sh + - test -z "$LINT" || make install_lint lint + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/vendor/github.com/uber-go/atomic/LICENSE.txt b/vendor/github.com/uber-go/atomic/LICENSE.txt new file mode 100644 index 00000000000..8765c9fbc61 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/LICENSE.txt @@ -0,0 +1,19 @@ +Copyright (c) 2016 Uber Technologies, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/uber-go/atomic/Makefile b/vendor/github.com/uber-go/atomic/Makefile new file mode 100644 index 00000000000..1ef263075d7 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/Makefile @@ -0,0 +1,51 @@ +# Many Go tools take file globs or directories as arguments instead of packages. +PACKAGE_FILES ?= *.go + +# For pre go1.6 +export GO15VENDOREXPERIMENT=1 + + +.PHONY: build +build: + go build -i ./... + + +.PHONY: install +install: + glide --version || go get github.com/Masterminds/glide + glide install + + +.PHONY: test +test: + go test -cover -race ./... + + +.PHONY: install_ci +install_ci: install + go get github.com/wadey/gocovmerge + go get github.com/mattn/goveralls + go get golang.org/x/tools/cmd/cover + +.PHONY: install_lint +install_lint: + go get golang.org/x/lint/golint + + +.PHONY: lint +lint: + @rm -rf lint.log + @echo "Checking formatting..." + @gofmt -d -s $(PACKAGE_FILES) 2>&1 | tee lint.log + @echo "Checking vet..." + @go vet ./... 2>&1 | tee -a lint.log;) + @echo "Checking lint..." + @golint $$(go list ./...) 2>&1 | tee -a lint.log + @echo "Checking for unresolved FIXMEs..." + @git grep -i fixme | grep -v -e vendor -e Makefile | tee -a lint.log + @[ ! -s lint.log ] + + +.PHONY: test_ci +test_ci: install_ci build + ./scripts/cover.sh $(shell go list $(PACKAGES)) diff --git a/vendor/github.com/uber-go/atomic/README.md b/vendor/github.com/uber-go/atomic/README.md new file mode 100644 index 00000000000..62eb8e57609 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/README.md @@ -0,0 +1,36 @@ +# atomic [![GoDoc][doc-img]][doc] [![Build Status][ci-img]][ci] [![Coverage Status][cov-img]][cov] [![Go Report Card][reportcard-img]][reportcard] + +Simple wrappers for primitive types to enforce atomic access. + +## Installation +`go get -u go.uber.org/atomic` + +## Usage +The standard library's `sync/atomic` is powerful, but it's easy to forget which +variables must be accessed atomically. `go.uber.org/atomic` preserves all the +functionality of the standard library, but wraps the primitive types to +provide a safer, more convenient API. + +```go +var atom atomic.Uint32 +atom.Store(42) +atom.Sub(2) +atom.CAS(40, 11) +``` + +See the [documentation][doc] for a complete API specification. + +## Development Status +Stable. + +___ +Released under the [MIT License](LICENSE.txt). + +[doc-img]: https://godoc.org/github.com/uber-go/atomic?status.svg +[doc]: https://godoc.org/go.uber.org/atomic +[ci-img]: https://travis-ci.com/uber-go/atomic.svg?branch=master +[ci]: https://travis-ci.com/uber-go/atomic +[cov-img]: https://codecov.io/gh/uber-go/atomic/branch/master/graph/badge.svg +[cov]: https://codecov.io/gh/uber-go/atomic +[reportcard-img]: https://goreportcard.com/badge/go.uber.org/atomic +[reportcard]: https://goreportcard.com/report/go.uber.org/atomic diff --git a/vendor/github.com/uber-go/atomic/atomic.go b/vendor/github.com/uber-go/atomic/atomic.go new file mode 100644 index 00000000000..1db6849fca0 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/atomic.go @@ -0,0 +1,351 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Package atomic provides simple wrappers around numerics to enforce atomic +// access. +package atomic + +import ( + "math" + "sync/atomic" + "time" +) + +// Int32 is an atomic wrapper around an int32. +type Int32 struct{ v int32 } + +// NewInt32 creates an Int32. +func NewInt32(i int32) *Int32 { + return &Int32{i} +} + +// Load atomically loads the wrapped value. +func (i *Int32) Load() int32 { + return atomic.LoadInt32(&i.v) +} + +// Add atomically adds to the wrapped int32 and returns the new value. +func (i *Int32) Add(n int32) int32 { + return atomic.AddInt32(&i.v, n) +} + +// Sub atomically subtracts from the wrapped int32 and returns the new value. +func (i *Int32) Sub(n int32) int32 { + return atomic.AddInt32(&i.v, -n) +} + +// Inc atomically increments the wrapped int32 and returns the new value. +func (i *Int32) Inc() int32 { + return i.Add(1) +} + +// Dec atomically decrements the wrapped int32 and returns the new value. +func (i *Int32) Dec() int32 { + return i.Sub(1) +} + +// CAS is an atomic compare-and-swap. +func (i *Int32) CAS(old, new int32) bool { + return atomic.CompareAndSwapInt32(&i.v, old, new) +} + +// Store atomically stores the passed value. +func (i *Int32) Store(n int32) { + atomic.StoreInt32(&i.v, n) +} + +// Swap atomically swaps the wrapped int32 and returns the old value. +func (i *Int32) Swap(n int32) int32 { + return atomic.SwapInt32(&i.v, n) +} + +// Int64 is an atomic wrapper around an int64. +type Int64 struct{ v int64 } + +// NewInt64 creates an Int64. +func NewInt64(i int64) *Int64 { + return &Int64{i} +} + +// Load atomically loads the wrapped value. +func (i *Int64) Load() int64 { + return atomic.LoadInt64(&i.v) +} + +// Add atomically adds to the wrapped int64 and returns the new value. +func (i *Int64) Add(n int64) int64 { + return atomic.AddInt64(&i.v, n) +} + +// Sub atomically subtracts from the wrapped int64 and returns the new value. +func (i *Int64) Sub(n int64) int64 { + return atomic.AddInt64(&i.v, -n) +} + +// Inc atomically increments the wrapped int64 and returns the new value. +func (i *Int64) Inc() int64 { + return i.Add(1) +} + +// Dec atomically decrements the wrapped int64 and returns the new value. +func (i *Int64) Dec() int64 { + return i.Sub(1) +} + +// CAS is an atomic compare-and-swap. +func (i *Int64) CAS(old, new int64) bool { + return atomic.CompareAndSwapInt64(&i.v, old, new) +} + +// Store atomically stores the passed value. +func (i *Int64) Store(n int64) { + atomic.StoreInt64(&i.v, n) +} + +// Swap atomically swaps the wrapped int64 and returns the old value. +func (i *Int64) Swap(n int64) int64 { + return atomic.SwapInt64(&i.v, n) +} + +// Uint32 is an atomic wrapper around an uint32. +type Uint32 struct{ v uint32 } + +// NewUint32 creates a Uint32. +func NewUint32(i uint32) *Uint32 { + return &Uint32{i} +} + +// Load atomically loads the wrapped value. +func (i *Uint32) Load() uint32 { + return atomic.LoadUint32(&i.v) +} + +// Add atomically adds to the wrapped uint32 and returns the new value. +func (i *Uint32) Add(n uint32) uint32 { + return atomic.AddUint32(&i.v, n) +} + +// Sub atomically subtracts from the wrapped uint32 and returns the new value. +func (i *Uint32) Sub(n uint32) uint32 { + return atomic.AddUint32(&i.v, ^(n - 1)) +} + +// Inc atomically increments the wrapped uint32 and returns the new value. +func (i *Uint32) Inc() uint32 { + return i.Add(1) +} + +// Dec atomically decrements the wrapped int32 and returns the new value. +func (i *Uint32) Dec() uint32 { + return i.Sub(1) +} + +// CAS is an atomic compare-and-swap. +func (i *Uint32) CAS(old, new uint32) bool { + return atomic.CompareAndSwapUint32(&i.v, old, new) +} + +// Store atomically stores the passed value. +func (i *Uint32) Store(n uint32) { + atomic.StoreUint32(&i.v, n) +} + +// Swap atomically swaps the wrapped uint32 and returns the old value. +func (i *Uint32) Swap(n uint32) uint32 { + return atomic.SwapUint32(&i.v, n) +} + +// Uint64 is an atomic wrapper around a uint64. +type Uint64 struct{ v uint64 } + +// NewUint64 creates a Uint64. +func NewUint64(i uint64) *Uint64 { + return &Uint64{i} +} + +// Load atomically loads the wrapped value. +func (i *Uint64) Load() uint64 { + return atomic.LoadUint64(&i.v) +} + +// Add atomically adds to the wrapped uint64 and returns the new value. +func (i *Uint64) Add(n uint64) uint64 { + return atomic.AddUint64(&i.v, n) +} + +// Sub atomically subtracts from the wrapped uint64 and returns the new value. +func (i *Uint64) Sub(n uint64) uint64 { + return atomic.AddUint64(&i.v, ^(n - 1)) +} + +// Inc atomically increments the wrapped uint64 and returns the new value. +func (i *Uint64) Inc() uint64 { + return i.Add(1) +} + +// Dec atomically decrements the wrapped uint64 and returns the new value. +func (i *Uint64) Dec() uint64 { + return i.Sub(1) +} + +// CAS is an atomic compare-and-swap. +func (i *Uint64) CAS(old, new uint64) bool { + return atomic.CompareAndSwapUint64(&i.v, old, new) +} + +// Store atomically stores the passed value. +func (i *Uint64) Store(n uint64) { + atomic.StoreUint64(&i.v, n) +} + +// Swap atomically swaps the wrapped uint64 and returns the old value. +func (i *Uint64) Swap(n uint64) uint64 { + return atomic.SwapUint64(&i.v, n) +} + +// Bool is an atomic Boolean. +type Bool struct{ v uint32 } + +// NewBool creates a Bool. +func NewBool(initial bool) *Bool { + return &Bool{boolToInt(initial)} +} + +// Load atomically loads the Boolean. +func (b *Bool) Load() bool { + return truthy(atomic.LoadUint32(&b.v)) +} + +// CAS is an atomic compare-and-swap. +func (b *Bool) CAS(old, new bool) bool { + return atomic.CompareAndSwapUint32(&b.v, boolToInt(old), boolToInt(new)) +} + +// Store atomically stores the passed value. +func (b *Bool) Store(new bool) { + atomic.StoreUint32(&b.v, boolToInt(new)) +} + +// Swap sets the given value and returns the previous value. +func (b *Bool) Swap(new bool) bool { + return truthy(atomic.SwapUint32(&b.v, boolToInt(new))) +} + +// Toggle atomically negates the Boolean and returns the previous value. +func (b *Bool) Toggle() bool { + return truthy(atomic.AddUint32(&b.v, 1) - 1) +} + +func truthy(n uint32) bool { + return n&1 == 1 +} + +func boolToInt(b bool) uint32 { + if b { + return 1 + } + return 0 +} + +// Float64 is an atomic wrapper around float64. +type Float64 struct { + v uint64 +} + +// NewFloat64 creates a Float64. +func NewFloat64(f float64) *Float64 { + return &Float64{math.Float64bits(f)} +} + +// Load atomically loads the wrapped value. +func (f *Float64) Load() float64 { + return math.Float64frombits(atomic.LoadUint64(&f.v)) +} + +// Store atomically stores the passed value. +func (f *Float64) Store(s float64) { + atomic.StoreUint64(&f.v, math.Float64bits(s)) +} + +// Add atomically adds to the wrapped float64 and returns the new value. +func (f *Float64) Add(s float64) float64 { + for { + old := f.Load() + new := old + s + if f.CAS(old, new) { + return new + } + } +} + +// Sub atomically subtracts from the wrapped float64 and returns the new value. +func (f *Float64) Sub(s float64) float64 { + return f.Add(-s) +} + +// CAS is an atomic compare-and-swap. +func (f *Float64) CAS(old, new float64) bool { + return atomic.CompareAndSwapUint64(&f.v, math.Float64bits(old), math.Float64bits(new)) +} + +// Duration is an atomic wrapper around time.Duration +// https://godoc.org/time#Duration +type Duration struct { + v Int64 +} + +// NewDuration creates a Duration. +func NewDuration(d time.Duration) *Duration { + return &Duration{v: *NewInt64(int64(d))} +} + +// Load atomically loads the wrapped value. +func (d *Duration) Load() time.Duration { + return time.Duration(d.v.Load()) +} + +// Store atomically stores the passed value. +func (d *Duration) Store(n time.Duration) { + d.v.Store(int64(n)) +} + +// Add atomically adds to the wrapped time.Duration and returns the new value. +func (d *Duration) Add(n time.Duration) time.Duration { + return time.Duration(d.v.Add(int64(n))) +} + +// Sub atomically subtracts from the wrapped time.Duration and returns the new value. +func (d *Duration) Sub(n time.Duration) time.Duration { + return time.Duration(d.v.Sub(int64(n))) +} + +// Swap atomically swaps the wrapped time.Duration and returns the old value. +func (d *Duration) Swap(n time.Duration) time.Duration { + return time.Duration(d.v.Swap(int64(n))) +} + +// CAS is an atomic compare-and-swap. +func (d *Duration) CAS(old, new time.Duration) bool { + return d.v.CAS(int64(old), int64(new)) +} + +// Value shadows the type of the same name from sync/atomic +// https://godoc.org/sync/atomic#Value +type Value struct{ atomic.Value } diff --git a/vendor/github.com/uber-go/atomic/error.go b/vendor/github.com/uber-go/atomic/error.go new file mode 100644 index 00000000000..0489d19badb --- /dev/null +++ b/vendor/github.com/uber-go/atomic/error.go @@ -0,0 +1,55 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package atomic + +// Error is an atomic type-safe wrapper around Value for errors +type Error struct{ v Value } + +// errorHolder is non-nil holder for error object. +// atomic.Value panics on saving nil object, so err object needs to be +// wrapped with valid object first. +type errorHolder struct{ err error } + +// NewError creates new atomic error object +func NewError(err error) *Error { + e := &Error{} + if err != nil { + e.Store(err) + } + return e +} + +// Load atomically loads the wrapped error +func (e *Error) Load() error { + v := e.v.Load() + if v == nil { + return nil + } + + eh := v.(errorHolder) + return eh.err +} + +// Store atomically stores error. +// NOTE: a holder object is allocated on each Store call. +func (e *Error) Store(err error) { + e.v.Store(errorHolder{err: err}) +} diff --git a/vendor/github.com/uber-go/atomic/glide.lock b/vendor/github.com/uber-go/atomic/glide.lock new file mode 100644 index 00000000000..3c72c59976d --- /dev/null +++ b/vendor/github.com/uber-go/atomic/glide.lock @@ -0,0 +1,17 @@ +hash: f14d51408e3e0e4f73b34e4039484c78059cd7fc5f4996fdd73db20dc8d24f53 +updated: 2016-10-27T00:10:51.16960137-07:00 +imports: [] +testImports: +- name: github.com/davecgh/go-spew + version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib +- name: github.com/stretchr/testify + version: d77da356e56a7428ad25149ca77381849a6a5232 + subpackages: + - assert + - require diff --git a/vendor/github.com/uber-go/atomic/glide.yaml b/vendor/github.com/uber-go/atomic/glide.yaml new file mode 100644 index 00000000000..4cf608ec0f8 --- /dev/null +++ b/vendor/github.com/uber-go/atomic/glide.yaml @@ -0,0 +1,6 @@ +package: go.uber.org/atomic +testImport: +- package: github.com/stretchr/testify + subpackages: + - assert + - require diff --git a/vendor/github.com/uber-go/atomic/string.go b/vendor/github.com/uber-go/atomic/string.go new file mode 100644 index 00000000000..ede8136face --- /dev/null +++ b/vendor/github.com/uber-go/atomic/string.go @@ -0,0 +1,49 @@ +// Copyright (c) 2016 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package atomic + +// String is an atomic type-safe wrapper around Value for strings. +type String struct{ v Value } + +// NewString creates a String. +func NewString(str string) *String { + s := &String{} + if str != "" { + s.Store(str) + } + return s +} + +// Load atomically loads the wrapped string. +func (s *String) Load() string { + v := s.v.Load() + if v == nil { + return "" + } + return v.(string) +} + +// Store atomically stores the passed string. +// Note: Converting the string to an interface{} to store in the Value +// requires an allocation. +func (s *String) Store(str string) { + s.v.Store(str) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index d0809d84782..228205ec00c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -538,6 +538,8 @@ github.com/thanos-io/thanos/pkg/tracing github.com/tinylib/msgp/msgp # github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 github.com/tmc/grpc-websocket-proxy/wsproxy +# github.com/uber-go/atomic v1.4.0 +github.com/uber-go/atomic # github.com/uber/jaeger-client-go v2.20.1+incompatible github.com/uber/jaeger-client-go github.com/uber/jaeger-client-go/config