From b703c6605d9901cdd9e045d4d13c141d1f4d133c Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 22 Nov 2018 13:51:41 +0000 Subject: [PATCH 1/8] Move & split up util/flags.go to utils/flagext. Signed-off-by: Tom Wilkie --- cmd/alertmanager/main.go | 3 +- cmd/configs/main.go | 3 +- cmd/distributor/main.go | 3 +- cmd/ingester/main.go | 3 +- cmd/lite/main.go | 5 +- cmd/querier/main.go | 5 +- cmd/query-frontend/main.go | 3 +- cmd/ruler/main.go | 3 +- cmd/table-manager/main.go | 3 +- cmd/test-exporter/main.go | 8 +- pkg/alertmanager/multitenant.go | 5 +- pkg/chunk/aws/storage_client.go | 7 +- pkg/chunk/chunk_store_test.go | 44 +++++------ pkg/chunk/schema_config.go | 19 ++--- pkg/chunk/storage/factory_test.go | 4 +- pkg/chunk/testutils/testutils.go | 8 +- pkg/distributor/distributor_test.go | 4 +- pkg/ingester/lifecycle_test.go | 7 +- pkg/querier/correctness/time_flag.go | 2 +- pkg/querier/frontend/frontend_test.go | 4 +- pkg/querier/frontend/results_cache_test.go | 4 +- pkg/querier/querier_test.go | 3 +- pkg/ruler/configs.go | 4 +- pkg/ruler/ruler.go | 5 +- pkg/util/flagext/day.go | 45 +++++++++++ pkg/util/flagext/register.go | 24 ++++++ pkg/util/flagext/url.go | 26 +++++++ pkg/util/flags.go | 91 ---------------------- pkg/util/validation/limits.go | 4 +- pkg/util/validation/validate_test.go | 4 +- 30 files changed, 186 insertions(+), 167 deletions(-) create mode 100644 pkg/util/flagext/day.go create mode 100644 pkg/util/flagext/register.go create mode 100644 pkg/util/flagext/url.go delete mode 100644 pkg/util/flags.go diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 95c1752dc14..1fdc784dcb5 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -21,6 +21,7 @@ import ( "github.com/cortexproject/cortex/pkg/alertmanager" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" @@ -36,7 +37,7 @@ func main() { } alertmanagerConfig alertmanager.MultitenantAlertmanagerConfig ) - util.RegisterFlags(&serverConfig, &alertmanagerConfig) + flagext.RegisterFlags(&serverConfig, &alertmanagerConfig) flag.Parse() util.InitLogger(&serverConfig) diff --git a/cmd/configs/main.go b/cmd/configs/main.go index 2e6d65ce799..ee94dbd0847 100644 --- a/cmd/configs/main.go +++ b/cmd/configs/main.go @@ -10,6 +10,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs/api" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" ) @@ -26,7 +27,7 @@ func main() { } dbConfig db.Config ) - util.RegisterFlags(&serverConfig, &dbConfig) + flagext.RegisterFlags(&serverConfig, &dbConfig) flag.Parse() util.InitLogger(&serverConfig) diff --git a/cmd/distributor/main.go b/cmd/distributor/main.go index c61e2bd4fca..dc0c7a0a5e8 100644 --- a/cmd/distributor/main.go +++ b/cmd/distributor/main.go @@ -15,6 +15,7 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" @@ -53,7 +54,7 @@ func main() { limits validation.Limits preallocConfig client.PreallocConfig ) - util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, + flagext.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, &preallocConfig) flag.Parse() diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 47c4d0a094c..3d7bef8e968 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -16,6 +16,7 @@ import ( "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" @@ -61,7 +62,7 @@ func main() { // Ingester needs to know our gRPC listen port. ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort - util.RegisterFlags(&serverConfig, &chunkStoreConfig, &storageConfig, + flagext.RegisterFlags(&serverConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &ingesterConfig, &clientConfig, &limits, &preallocConfig, &marshalConfig) flag.UintVar(&maxStreams, "ingester.max-concurrent-streams", 1000, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)") flag.IntVar(&eventSampleRate, "event.sample-rate", 0, "How often to sample observability events (0 = never).") diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 72ce752002d..7feb74c54e3 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -24,6 +24,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" @@ -149,7 +150,7 @@ func main() { map[string]string{}, // TODO: include configuration flags func(f http.HandlerFunc) http.HandlerFunc { return f }, func() *tsdb.DB { return nil }, // Only needed for admin APIs. - false, // Disable admin APIs. + false, // Disable admin APIs. util.Logger, querier.DummyRulesRetriever{}, ) @@ -209,7 +210,7 @@ func getConfigsFromCommandLine() { } // Ingester needs to know our gRPC listen port. ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort - util.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, &querierConfig, + flagext.RegisterFlags(&serverConfig, &chunkStoreConfig, &distributorConfig, &querierConfig, &ingesterConfig, &configStoreConfig, &rulerConfig, &storageConfig, &schemaConfig, &ingesterClientConfig, &limitsConfig, &tbmConfig) flag.BoolVar(&unauthenticated, "unauthenticated", false, "Set to true to disable multitenancy.") diff --git a/cmd/querier/main.go b/cmd/querier/main.go index 41969792da3..8b868e49e78 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -23,6 +23,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" httpgrpc_server "github.com/weaveworks/common/httpgrpc/server" "github.com/weaveworks/common/middleware" @@ -49,7 +50,7 @@ func main() { workerConfig frontend.WorkerConfig queryParallelism int ) - util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, + flagext.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, &querierConfig, &chunkStoreConfig, &schemaConfig, &storageConfig, &workerConfig) flag.IntVar(&queryParallelism, "querier.query-parallelism", 100, "Max subqueries run in parallel per higher-level query.") flag.Parse() @@ -114,7 +115,7 @@ func main() { map[string]string{}, // TODO: include configuration flags func(f http.HandlerFunc) http.HandlerFunc { return f }, func() *tsdb.DB { return nil }, // Only needed for admin APIs. - false, // Disable admin APIs. + false, // Disable admin APIs. util.Logger, querier.DummyRulesRetriever{}, ) diff --git a/cmd/query-frontend/main.go b/cmd/query-frontend/main.go index 14c9677232f..824b3ecedf4 100644 --- a/cmd/query-frontend/main.go +++ b/cmd/query-frontend/main.go @@ -9,6 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/frontend" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" "github.com/weaveworks/common/tracing" @@ -25,7 +26,7 @@ func main() { frontendConfig frontend.Config maxMessageSize int ) - util.RegisterFlags(&serverConfig, &frontendConfig) + flagext.RegisterFlags(&serverConfig, &frontendConfig) flag.IntVar(&maxMessageSize, "query-frontend.max-recv-message-size-bytes", 1024*1024*64, "Limit on the size of a grpc message this server can receive.") flag.Parse() diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go index 1766ab8907b..9ca70b604d3 100644 --- a/cmd/ruler/main.go +++ b/cmd/ruler/main.go @@ -16,6 +16,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ruler" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" @@ -47,7 +48,7 @@ func main() { trace := tracing.NewFromEnv("ruler") defer trace.Close() - util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, + flagext.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &clientConfig, &limits, &rulerConfig, &chunkStoreConfig, &storageConfig, &schemaConfig, &configStoreConfig, &querierConfig) flag.Parse() diff --git a/cmd/table-manager/main.go b/cmd/table-manager/main.go index 2f18f81b7c9..1a3f12721a8 100644 --- a/cmd/table-manager/main.go +++ b/cmd/table-manager/main.go @@ -11,6 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/server" ) @@ -29,7 +30,7 @@ func main() { schemaConfig chunk.SchemaConfig tbmConfig chunk.TableManagerConfig ) - util.RegisterFlags(&ingesterConfig, &serverConfig, &storageConfig, &schemaConfig, &tbmConfig) + flagext.RegisterFlags(&ingesterConfig, &serverConfig, &storageConfig, &schemaConfig, &tbmConfig) flag.Parse() util.InitLogger(&serverConfig) diff --git a/cmd/test-exporter/main.go b/cmd/test-exporter/main.go index eeae3666450..eba57905eeb 100644 --- a/cmd/test-exporter/main.go +++ b/cmd/test-exporter/main.go @@ -8,11 +8,12 @@ import ( "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/server" + "github.com/weaveworks/common/tracing" "github.com/cortexproject/cortex/pkg/querier/correctness" "github.com/cortexproject/cortex/pkg/util" - "github.com/weaveworks/common/server" - "github.com/weaveworks/common/tracing" + "github.com/cortexproject/cortex/pkg/util/flagext" ) var ( @@ -24,8 +25,7 @@ func main() { serverConfig server.Config runnerConfig correctness.RunnerConfig ) - serverConfig.RegisterFlags(flag.CommandLine) - runnerConfig.RegisterFlags(flag.CommandLine) + flagext.RegisterFlags(&serverConfig, &runnerConfig) flag.Parse() // Setting the environment variable JAEGER_AGENT_HOST enables tracing diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index c090302fca0..f8372cd574f 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -22,6 +22,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs" configs_client "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" "github.com/weaveworks/mesh" @@ -176,8 +177,8 @@ func counts(counts map[string]int, keys []string) string { type MultitenantAlertmanagerConfig struct { DataDir string Retention time.Duration - ExternalURL util.URLValue - ConfigsAPIURL util.URLValue + ExternalURL flagext.URLValue + ConfigsAPIURL flagext.URLValue PollInterval time.Duration ClientTimeout time.Duration diff --git a/pkg/chunk/aws/storage_client.go b/pkg/chunk/aws/storage_client.go index 75320c2a673..ba8153e77d6 100644 --- a/pkg/chunk/aws/storage_client.go +++ b/pkg/chunk/aws/storage_client.go @@ -24,6 +24,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" chunk_util "github.com/cortexproject/cortex/pkg/chunk/util" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" awscommon "github.com/weaveworks/common/aws" "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" @@ -97,9 +98,9 @@ func init() { // DynamoDBConfig specifies config for a DynamoDB database. type DynamoDBConfig struct { - DynamoDB util.URLValue + DynamoDB flagext.URLValue APILimit float64 - ApplicationAutoScaling util.URLValue + ApplicationAutoScaling flagext.URLValue Metrics MetricsAutoScalingConfig ChunkGangSize int ChunkGetMaxParallelism int @@ -123,7 +124,7 @@ func (cfg *DynamoDBConfig) RegisterFlags(f *flag.FlagSet) { // StorageConfig specifies config for storing data on AWS. type StorageConfig struct { DynamoDBConfig - S3 util.URLValue + S3 flagext.URLValue } // RegisterFlags adds the flags required to config this to the given FlagSet diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index d123b39be23..f3d55392a3e 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -17,8 +17,8 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/chunk/encoding" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/test" "github.com/weaveworks/common/user" @@ -49,7 +49,7 @@ var stores = []struct { var ( storeCfg StoreConfig ) - util.DefaultValues(&storeCfg) + flagext.DefaultValues(&storeCfg) return storeCfg }, }, @@ -59,7 +59,7 @@ var stores = []struct { var ( storeCfg StoreConfig ) - util.DefaultValues(&storeCfg) + flagext.DefaultValues(&storeCfg) storeCfg.WriteDedupeCacheConfig.Cache = cache.NewFifoCache("test", cache.FifoCacheConfig{ Size: 500, @@ -75,7 +75,7 @@ func newTestChunkStore(t *testing.T, schemaName string) Store { var ( storeCfg StoreConfig ) - util.DefaultValues(&storeCfg) + flagext.DefaultValues(&storeCfg) return newTestChunkStoreConfig(t, schemaName, storeCfg) } @@ -84,7 +84,7 @@ func newTestChunkStoreConfig(t *testing.T, schemaName string, storeCfg StoreConf tbmConfig TableManagerConfig schemaCfg = DefaultSchemaConfig("", schemaName, 0) ) - util.DefaultValues(&tbmConfig) + flagext.DefaultValues(&tbmConfig) storage := NewMockStorage() tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage) require.NoError(t, err) @@ -93,7 +93,7 @@ func newTestChunkStoreConfig(t *testing.T, schemaName string, storeCfg StoreConf require.NoError(t, err) var limits validation.Limits - util.DefaultValues(&limits) + flagext.DefaultValues(&limits) limits.MaxQueryLength = 30 * 24 * time.Hour overrides, err := validation.NewOverrides(limits) require.NoError(t, err) @@ -135,25 +135,25 @@ func TestChunkStore_Get(t *testing.T) { fooMetric1 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", } fooMetric2 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", } // barMetric1 is a subset of barMetric2 to test over-matching bug. barMetric1 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", + "bar": "baz", } barMetric2 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } fooChunk1 := dummyChunkFor(now, fooMetric1) @@ -315,14 +315,14 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { now := model.Now() chunk1 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", }) chunk2 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", }) for _, tc := range []struct { @@ -424,7 +424,7 @@ func TestChunkStoreRandom(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -488,7 +488,7 @@ func TestChunkStoreLeastRead(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -534,7 +534,7 @@ func TestIndexCachingWorks(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) metric := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", } storeMaker := stores[1] storeCfg := storeMaker.configFn() diff --git a/pkg/chunk/schema_config.go b/pkg/chunk/schema_config.go index e0e175d7bae..84d72bc3cc7 100644 --- a/pkg/chunk/schema_config.go +++ b/pkg/chunk/schema_config.go @@ -11,6 +11,7 @@ import ( yaml "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/mtime" ) @@ -45,20 +46,20 @@ type LegacySchemaConfig struct { // After midnight on this day, we start bucketing indexes by day instead of by // hour. Only the day matters, not the time within the day. - DailyBucketsFrom util.DayValue - Base64ValuesFrom util.DayValue - V4SchemaFrom util.DayValue - V5SchemaFrom util.DayValue - V6SchemaFrom util.DayValue - V9SchemaFrom util.DayValue - BigtableColumnKeyFrom util.DayValue + DailyBucketsFrom flagext.DayValue + Base64ValuesFrom flagext.DayValue + V4SchemaFrom flagext.DayValue + V5SchemaFrom flagext.DayValue + V6SchemaFrom flagext.DayValue + V9SchemaFrom flagext.DayValue + BigtableColumnKeyFrom flagext.DayValue // Config for the index & chunk tables. OriginalTableName string UsePeriodicTables bool - IndexTablesFrom util.DayValue + IndexTablesFrom flagext.DayValue IndexTables PeriodicTableConfig - ChunkTablesFrom util.DayValue + ChunkTablesFrom flagext.DayValue ChunkTables PeriodicTableConfig } diff --git a/pkg/chunk/storage/factory_test.go b/pkg/chunk/storage/factory_test.go index b09d90ce9cf..10a41ba06a5 100644 --- a/pkg/chunk/storage/factory_test.go +++ b/pkg/chunk/storage/factory_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -18,7 +18,7 @@ func TestFactoryStop(t *testing.T) { schemaConfig chunk.SchemaConfig defaults validation.Limits ) - util.DefaultValues(&cfg, &storeConfig, &schemaConfig, &defaults) + flagext.DefaultValues(&cfg, &storeConfig, &schemaConfig, &defaults) schemaConfig.Configs = []chunk.PeriodConfig{ { From: model.Time(0), diff --git a/pkg/chunk/testutils/testutils.go b/pkg/chunk/testutils/testutils.go index bcbe7b07084..6ad32a62a22 100644 --- a/pkg/chunk/testutils/testutils.go +++ b/pkg/chunk/testutils/testutils.go @@ -6,10 +6,10 @@ import ( "time" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/prometheus/common/model" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/util" ) const ( @@ -26,7 +26,7 @@ type Fixture interface { // Setup a fixture with initial tables func Setup(fixture Fixture, tableName string) (chunk.StorageClient, error) { var tbmConfig chunk.TableManagerConfig - util.DefaultValues(&tbmConfig) + flagext.DefaultValues(&tbmConfig) storageClient, tableClient, schemaConfig, err := fixture.Clients() if err != nil { return nil, err @@ -70,8 +70,8 @@ func CreateChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { func dummyChunk(now model.Time) chunk.Chunk { return dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", }) } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b0e6c840cb7..5536e6f41a5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -23,8 +23,8 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -278,7 +278,7 @@ func prepare(t *testing.T, numIngesters, happyIngesters int, queryDelay time.Dur var cfg Config var limits validation.Limits var clientConfig client.Config - util.DefaultValues(&cfg, &limits, &clientConfig) + flagext.DefaultValues(&cfg, &limits, &clientConfig) limits.IngestionRate = 20 limits.IngestionBurstSize = 20 cfg.ingesterClientFactory = factory diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 3dd16dbcd7a..e8f7b6a9ca3 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -20,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/test" "github.com/cortexproject/cortex/pkg/util/validation" "github.com/weaveworks/common/user" @@ -30,7 +31,7 @@ const userID = "1" func defaultIngesterTestConfig() Config { consul := ring.NewInMemoryKVClient() cfg := Config{} - util.DefaultValues(&cfg) + flagext.DefaultValues(&cfg) cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxChunkIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 @@ -44,13 +45,13 @@ func defaultIngesterTestConfig() Config { func defaultClientTestConfig() client.Config { clientConfig := client.Config{} - util.DefaultValues(&clientConfig) + flagext.DefaultValues(&clientConfig) return clientConfig } func defaultLimitsTestConfig() validation.Limits { limits := validation.Limits{} - util.DefaultValues(&limits) + flagext.DefaultValues(&limits) return limits } diff --git a/pkg/querier/correctness/time_flag.go b/pkg/querier/correctness/time_flag.go index 55d4dd3d7ca..88d0bcb1a62 100644 --- a/pkg/querier/correctness/time_flag.go +++ b/pkg/querier/correctness/time_flag.go @@ -10,7 +10,7 @@ type TimeValue struct { set bool } -// NewTimeValue makes a new DayValue; will round t down to the nearest midnight. +// NewTimeValue makes a new TimeValue; will round t down to the nearest midnight. func NewTimeValue(t time.Time) TimeValue { return TimeValue{ Time: t, diff --git a/pkg/querier/frontend/frontend_test.go b/pkg/querier/frontend/frontend_test.go index 30a854d97a4..723a58b1cd2 100644 --- a/pkg/querier/frontend/frontend_test.go +++ b/pkg/querier/frontend/frontend_test.go @@ -9,6 +9,7 @@ import ( "sync/atomic" "testing" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/go-kit/kit/log" otgrpc "github.com/opentracing-contrib/go-grpc" "github.com/opentracing-contrib/go-stdlib/nethttp" @@ -21,7 +22,6 @@ import ( "github.com/weaveworks/common/middleware" "google.golang.org/grpc" - "github.com/cortexproject/cortex/pkg/util" "github.com/weaveworks/common/user" ) @@ -131,7 +131,7 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) { config Config workerConfig WorkerConfig ) - util.DefaultValues(&config, &workerConfig) + flagext.DefaultValues(&config, &workerConfig) config.SplitQueriesByDay = true // localhost:0 prevents firewall warnings on Mac OS X. diff --git a/pkg/querier/frontend/results_cache_test.go b/pkg/querier/frontend/results_cache_test.go index 501ca2e7e37..b198a59d330 100644 --- a/pkg/querier/frontend/results_cache_test.go +++ b/pkg/querier/frontend/results_cache_test.go @@ -7,7 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" client "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/wire" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" @@ -191,7 +191,7 @@ func TestResultsCache(t *testing.T) { func TestResultsCacheRecent(t *testing.T) { var cfg resultsCacheConfig - util.DefaultValues(&cfg) + flagext.DefaultValues(&cfg) cfg.cacheConfig.Cache = cache.NewMockCache() rcm, err := newResultsCacheMiddleware(cfg) require.NoError(t, err) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 8369fd572f0..2ba6b756331 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -20,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/iterators" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/wire" "github.com/weaveworks/common/user" ) @@ -123,7 +124,7 @@ var ( func TestQuerier(t *testing.T) { var cfg Config - util.DefaultValues(&cfg) + flagext.DefaultValues(&cfg) for _, query := range queries { for _, encoding := range encodings { diff --git a/pkg/ruler/configs.go b/pkg/ruler/configs.go index fad8af529a7..d209369e629 100644 --- a/pkg/ruler/configs.go +++ b/pkg/ruler/configs.go @@ -9,7 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs" configs_client "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/configs/db" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" ) // ConfigStoreConfig says where we can find the ruler configs. @@ -17,7 +17,7 @@ type ConfigStoreConfig struct { DBConfig db.Config // DEPRECATED - ConfigsAPIURL util.URLValue + ConfigsAPIURL flagext.URLValue // DEPRECATED. HTTP timeout duration for requests made to the Weave Cloud // configs service. diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 1c240ca2eb7..9caaa40e61a 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -33,6 +33,7 @@ import ( "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/user" ) @@ -78,14 +79,14 @@ func init() { // Config is the configuration for the recording rules server. type Config struct { // This is used for template expansion in alerts; must be a valid URL - ExternalURL util.URLValue + ExternalURL flagext.URLValue // How frequently to evaluate rules by default. EvaluationInterval time.Duration NumWorkers int // URL of the Alertmanager to send notifications to. - AlertmanagerURL util.URLValue + AlertmanagerURL flagext.URLValue // Whether to use DNS SRV records to discover alertmanagers. AlertmanagerDiscovery bool // How long to wait between refreshing the list of alertmanagers based on diff --git a/pkg/util/flagext/day.go b/pkg/util/flagext/day.go new file mode 100644 index 00000000000..bdf893a243b --- /dev/null +++ b/pkg/util/flagext/day.go @@ -0,0 +1,45 @@ +package flagext + +import ( + "time" + + "github.com/prometheus/common/model" +) + +const secondsInDay = 24 * 60 * 60 + +// DayValue is a model.Time that can be used as a flag. +// NB it only parses days! +type DayValue struct { + model.Time + set bool +} + +// NewDayValue makes a new DayValue; will round t down to the nearest midnight. +func NewDayValue(t model.Time) DayValue { + return DayValue{ + Time: model.TimeFromUnix((t.Unix() / secondsInDay) * secondsInDay), + set: true, + } +} + +// String implements flag.Value +func (v DayValue) String() string { + return v.Time.Time().Format(time.RFC3339) +} + +// Set implements flag.Value +func (v *DayValue) Set(s string) error { + t, err := time.Parse("2006-01-02", s) + if err != nil { + return err + } + v.Time = model.TimeFromUnix(t.Unix()) + v.set = true + return nil +} + +// IsSet returns true is the DayValue has been set. +func (v *DayValue) IsSet() bool { + return v.set +} diff --git a/pkg/util/flagext/register.go b/pkg/util/flagext/register.go new file mode 100644 index 00000000000..7a82b047bbc --- /dev/null +++ b/pkg/util/flagext/register.go @@ -0,0 +1,24 @@ +package flagext + +import "flag" + +// Registerer is a thing that can RegisterFlags +type Registerer interface { + RegisterFlags(*flag.FlagSet) +} + +// RegisterFlags registers flags with the provided Registerers +func RegisterFlags(rs ...Registerer) { + for _, r := range rs { + r.RegisterFlags(flag.CommandLine) + } +} + +// DefaultValues intiates a set of configs (Registerers) with their defaults. +func DefaultValues(rs ...Registerer) { + fs := flag.NewFlagSet("", flag.PanicOnError) + for _, r := range rs { + r.RegisterFlags(fs) + } + fs.Parse([]string{}) +} diff --git a/pkg/util/flagext/url.go b/pkg/util/flagext/url.go new file mode 100644 index 00000000000..6e978009642 --- /dev/null +++ b/pkg/util/flagext/url.go @@ -0,0 +1,26 @@ +package flagext + +import "net/url" + +// URLValue is a url.URL that can be used as a flag. +type URLValue struct { + *url.URL +} + +// String implements flag.Value +func (v URLValue) String() string { + if v.URL == nil { + return "" + } + return v.URL.String() +} + +// Set implements flag.Value +func (v *URLValue) Set(s string) error { + u, err := url.Parse(s) + if err != nil { + return err + } + v.URL = u + return nil +} diff --git a/pkg/util/flags.go b/pkg/util/flags.go deleted file mode 100644 index 2e8ed4a3949..00000000000 --- a/pkg/util/flags.go +++ /dev/null @@ -1,91 +0,0 @@ -package util - -import ( - "flag" - "net/url" - "time" - - "github.com/prometheus/common/model" -) - -const secondsInDay = 24 * 60 * 60 - -// Registerer is a thing that can RegisterFlags -type Registerer interface { - RegisterFlags(*flag.FlagSet) -} - -// RegisterFlags registers flags with the provided Registerers -func RegisterFlags(rs ...Registerer) { - for _, r := range rs { - r.RegisterFlags(flag.CommandLine) - } -} - -// DefaultValues intiates a set of configs (Registerers) with their defaults. -func DefaultValues(rs ...Registerer) { - fs := flag.NewFlagSet("", flag.PanicOnError) - for _, r := range rs { - r.RegisterFlags(fs) - } - fs.Parse([]string{}) -} - -// DayValue is a model.Time that can be used as a flag. -// NB it only parses days! -type DayValue struct { - model.Time - set bool -} - -// NewDayValue makes a new DayValue; will round t down to the nearest midnight. -func NewDayValue(t model.Time) DayValue { - return DayValue{ - Time: model.TimeFromUnix((t.Unix() / secondsInDay) * secondsInDay), - set: true, - } -} - -// String implements flag.Value -func (v DayValue) String() string { - return v.Time.Time().Format(time.RFC3339) -} - -// Set implements flag.Value -func (v *DayValue) Set(s string) error { - t, err := time.Parse("2006-01-02", s) - if err != nil { - return err - } - v.Time = model.TimeFromUnix(t.Unix()) - v.set = true - return nil -} - -// IsSet returns true is the DayValue has been set. -func (v *DayValue) IsSet() bool { - return v.set -} - -// URLValue is a url.URL that can be used as a flag. -type URLValue struct { - *url.URL -} - -// String implements flag.Value -func (v URLValue) String() string { - if v.URL == nil { - return "" - } - return v.URL.String() -} - -// Set implements flag.Value -func (v *URLValue) Set(s string) error { - u, err := url.Parse(s) - if err != nil { - return err - } - v.URL = u - return nil -} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 89aed26770f..c1dea6ec61b 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -4,7 +4,7 @@ import ( "flag" "time" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" ) // Limits describe all the limits for users; can be used to describe global default @@ -63,7 +63,7 @@ func (l *Limits) UnmarshalYAML(unmarshal func(interface{}) error) error { // We want to set c to the defaults and then overwrite it with the input. // To make unmarshal fill the plain data struct rather than calling UnmarshalYAML // again, we have to hide it using a type indirection. See prometheus/config. - util.DefaultValues(l) + flagext.DefaultValues(l) type plain Limits return unmarshal((*plain)(l)) } diff --git a/pkg/util/validation/validate_test.go b/pkg/util/validation/validate_test.go index 03614370321..e853566b4d6 100644 --- a/pkg/util/validation/validate_test.go +++ b/pkg/util/validation/validate_test.go @@ -9,14 +9,14 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/weaveworks/common/httpgrpc" ) func TestValidateLabels(t *testing.T) { var cfg Limits userID := "testUser" - util.DefaultValues(&cfg) + flagext.DefaultValues(&cfg) cfg.MaxLabelValueLength = 25 cfg.MaxLabelNameLength = 25 cfg.MaxLabelNamesPerSeries = 64 From f22bf0151965855e790b4d666141bdfae5ef3d94 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 22 Nov 2018 13:52:15 +0000 Subject: [PATCH 2/8] Specify multiple interfaces to look for an address on. This allows us to start Cortex on the Mac. Signed-off-by: Tom Wilkie --- pkg/ring/lifecycler.go | 8 +++++--- pkg/ring/lifecycler_test.go | 6 +++--- pkg/util/flagext/strings.go | 18 +++++++++++++++++ pkg/util/net.go | 40 +++++++++++++++++++------------------ 4 files changed, 47 insertions(+), 25 deletions(-) create mode 100644 pkg/util/flagext/strings.go diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index ec0ea7ddfa9..4e2999ad140 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" ) const ( @@ -51,7 +52,7 @@ type LifecyclerConfig struct { // For testing, you can override the address and ID of this ingester Addr string Port int - InfName string + InfNames []string ID string SkipUnregister bool } @@ -72,7 +73,8 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) { os.Exit(1) } - f.StringVar(&cfg.InfName, "ingester.interface", "eth0", "Name of network interface to read address from.") + cfg.InfNames = []string{"eth0", "en0"} + f.Var((*flagext.Strings)(&cfg.InfNames), "ingester.interface", "Name of network interface to read address from.") f.StringVar(&cfg.Addr, "ingester.addr", "", "IP address to advertise in consul.") f.IntVar(&cfg.Port, "ingester.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, "ingester.ID", hostname, "ID to register into consul.") @@ -127,7 +129,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life addr := cfg.Addr if addr == "" { var err error - addr, err = util.GetFirstAddressOf(cfg.InfName) + addr, err = util.GetFirstAddressOf(cfg.InfNames) if err != nil { return nil, err } diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index c8aa4e1f715..0801e224cc2 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/test" ) @@ -26,7 +26,7 @@ func (f *flushTransferer) TransferOut(ctx context.Context) error { func TestRingNormaliseMigration(t *testing.T) { var ringConfig Config - util.DefaultValues(&ringConfig) + flagext.DefaultValues(&ringConfig) ringConfig.Mock = NewInMemoryKVClient() r, err := New(ringConfig) @@ -35,7 +35,7 @@ func TestRingNormaliseMigration(t *testing.T) { // Add an 'ingester' with denormalised tokens. var lifecyclerConfig1 LifecyclerConfig - util.DefaultValues(&lifecyclerConfig1) + flagext.DefaultValues(&lifecyclerConfig1) lifecyclerConfig1.Addr = "0.0.0.0" lifecyclerConfig1.Port = 1 lifecyclerConfig1.RingConfig = ringConfig diff --git a/pkg/util/flagext/strings.go b/pkg/util/flagext/strings.go new file mode 100644 index 00000000000..08242cd161f --- /dev/null +++ b/pkg/util/flagext/strings.go @@ -0,0 +1,18 @@ +package flagext + +import ( + "fmt" +) + +type Strings []string + +// String implements flag.Value +func (ss Strings) String() string { + return fmt.Sprintf("%s", []string(ss)) +} + +// Set implements flag.Value +func (ss *Strings) Set(s string) error { + *ss = append(*ss, s) + return nil +} diff --git a/pkg/util/net.go b/pkg/util/net.go index 8c66e084433..f36344f81ac 100644 --- a/pkg/util/net.go +++ b/pkg/util/net.go @@ -5,29 +5,31 @@ import ( "net" ) -// GetFirstAddressOf returns the first IPv4 address of the supplied interface name. -func GetFirstAddressOf(name string) (string, error) { - inf, err := net.InterfaceByName(name) - if err != nil { - return "", err - } +// GetFirstAddressOf returns the first IPv4 address of the supplied interface names. +func GetFirstAddressOf(names []string) (string, error) { + for _, name := range names { + inf, err := net.InterfaceByName(name) + if err != nil { + continue + } - addrs, err := inf.Addrs() - if err != nil { - return "", err - } - if len(addrs) <= 0 { - return "", fmt.Errorf("No address found for %s", name) - } + addrs, err := inf.Addrs() + if err != nil { + return "", err + } + if len(addrs) <= 0 { + return "", fmt.Errorf("No address found for %s", name) + } - for _, addr := range addrs { - switch v := addr.(type) { - case *net.IPNet: - if ip := v.IP.To4(); ip != nil { - return v.IP.String(), nil + for _, addr := range addrs { + switch v := addr.(type) { + case *net.IPNet: + if ip := v.IP.To4(); ip != nil { + return v.IP.String(), nil + } } } } - return "", fmt.Errorf("No address found for %s", name) + return "", fmt.Errorf("No address found for %s", names) } From a56a6c81163ddb01970cb734f67118264fa82d04 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 22 Nov 2018 13:59:48 +0000 Subject: [PATCH 3/8] Add yaml tags to ring.Config. Signed-off-by: Tom Wilkie --- pkg/ring/ring.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 6ba2b10ace1..c6da8ef0015 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -56,10 +56,10 @@ var ErrEmptyRing = errors.New("empty ring") // Config for a Ring type Config struct { - ConsulConfig - store string - HeartbeatTimeout time.Duration - ReplicationFactor int + ConsulConfig `yaml:"consul,omitempty"` + Store string `yaml:"store,omitempty"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"` + ReplicationFactor int `yaml:"replication_factor,omitempty"` Mock KVClient } @@ -68,7 +68,7 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ConsulConfig.RegisterFlags(f) - f.StringVar(&cfg.store, "ring.store", "consul", "Backend storage to use for the ring (consul, inmemory).") + f.StringVar(&cfg.Store, "ring.store", "consul", "Backend storage to use for the ring (consul, inmemory).") f.DurationVar(&cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") f.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") } @@ -99,7 +99,7 @@ func New(cfg Config) (*Ring, error) { if store == nil { var err error - switch cfg.store { + switch cfg.Store { case "consul": codec := ProtoCodec{Factory: ProtoDescFactory} store, err = NewConsulClient(cfg.ConsulConfig, codec) From 6dc592ed8547d357f5562d8aa8ec2e384c91f4c7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 22 Nov 2018 14:32:14 +0000 Subject: [PATCH 4/8] Rename some config, so we can override it in YAML. Signed-off-by: Tom Wilkie --- pkg/ring/lifecycler.go | 16 ++++++++-------- pkg/ring/ring.go | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 4e2999ad140..0978850587a 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -38,16 +38,15 @@ var ( // LifecyclerConfig is the config to build a Lifecycler. type LifecyclerConfig struct { - KVClient KVClient - RingConfig Config + RingConfig Config `yaml:"ring,omitempty"` // Config for the ingester lifecycle control ListenPort *int - NumTokens int - HeartbeatPeriod time.Duration - JoinAfter time.Duration - ClaimOnRollout bool - NormaliseTokens bool + NumTokens int `yaml:"num_tokens,omitempty"` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period,omitempty"` + JoinAfter time.Duration `yaml:"join_after,omitempty"` + ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"` + NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` // For testing, you can override the address and ID of this ingester Addr string @@ -55,6 +54,7 @@ type LifecyclerConfig struct { InfNames []string ID string SkipUnregister bool + KVClient KVClient } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -120,7 +120,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life if kvstore == nil { var err error codec := ProtoCodec{Factory: ProtoDescFactory} - kvstore, err = NewConsulClient(cfg.RingConfig.ConsulConfig, codec) + kvstore, err = NewConsulClient(cfg.RingConfig.Consul, codec) if err != nil { return nil, err } diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index c6da8ef0015..46e586aefc9 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -56,7 +56,7 @@ var ErrEmptyRing = errors.New("empty ring") // Config for a Ring type Config struct { - ConsulConfig `yaml:"consul,omitempty"` + Consul ConsulConfig `yaml:"consul,omitempty"` Store string `yaml:"store,omitempty"` HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout,omitempty"` ReplicationFactor int `yaml:"replication_factor,omitempty"` @@ -66,7 +66,7 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - cfg.ConsulConfig.RegisterFlags(f) + cfg.Consul.RegisterFlags(f) f.StringVar(&cfg.Store, "ring.store", "consul", "Backend storage to use for the ring (consul, inmemory).") f.DurationVar(&cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") @@ -102,7 +102,7 @@ func New(cfg Config) (*Ring, error) { switch cfg.Store { case "consul": codec := ProtoCodec{Factory: ProtoDescFactory} - store, err = NewConsulClient(cfg.ConsulConfig, codec) + store, err = NewConsulClient(cfg.Consul, codec) case "inmemory": store = NewInMemoryKVClient() } From 4e3a3c3abcb1d67ace4b6e86e5ec7131dd6dd4d8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 22 Nov 2018 14:50:31 +0000 Subject: [PATCH 5/8] Make the ring lifecycler use the same code to pick the KSStore implementation as the ring does. Also, remove the two instances of KVClient mock hooks. Signed-off-by: Tom Wilkie --- pkg/ingester/lifecycle_test.go | 8 ++-- pkg/ring/consul_client.go | 48 -------------------- pkg/ring/kvstore.go | 80 ++++++++++++++++++++++++++++++++++ pkg/ring/lifecycler.go | 18 +++----- pkg/ring/lifecycler_test.go | 1 - pkg/ring/ring.go | 17 ++------ 6 files changed, 93 insertions(+), 79 deletions(-) create mode 100644 pkg/ring/kvstore.go diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index e8f7b6a9ca3..81f91dc7d73 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -35,7 +35,7 @@ func defaultIngesterTestConfig() Config { cfg.FlushCheckPeriod = 99999 * time.Hour cfg.MaxChunkIdle = 99999 * time.Hour cfg.ConcurrentFlushes = 1 - cfg.LifecyclerConfig.KVClient = consul + cfg.LifecyclerConfig.RingConfig.Mock = consul cfg.LifecyclerConfig.NumTokens = 1 cfg.LifecyclerConfig.ListenPort = func(i int) *int { return &i }(0) cfg.LifecyclerConfig.Addr = "localhost" @@ -69,7 +69,7 @@ func TestIngesterRestart(t *testing.T) { } test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return numTokens(config.LifecyclerConfig.KVClient, "localhost") + return numTokens(config.LifecyclerConfig.RingConfig.Mock, "localhost") }) { @@ -81,7 +81,7 @@ func TestIngesterRestart(t *testing.T) { time.Sleep(200 * time.Millisecond) test.Poll(t, 100*time.Millisecond, 1, func() interface{} { - return numTokens(config.LifecyclerConfig.KVClient, "localhost") + return numTokens(config.LifecyclerConfig.RingConfig.Mock, "localhost") }) } @@ -123,7 +123,7 @@ func TestIngesterTransfer(t *testing.T) { // Start a second ingester, but let it go into PENDING cfg2 := defaultIngesterTestConfig() - cfg2.LifecyclerConfig.KVClient = cfg1.LifecyclerConfig.KVClient + cfg2.LifecyclerConfig.RingConfig.Mock = cfg1.LifecyclerConfig.RingConfig.Mock cfg2.LifecyclerConfig.ID = "ingester2" cfg2.LifecyclerConfig.Addr = "ingester2" cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second diff --git a/pkg/ring/consul_client.go b/pkg/ring/consul_client.go index ad826f484cd..98374e59c44 100644 --- a/pkg/ring/consul_client.go +++ b/pkg/ring/consul_client.go @@ -8,8 +8,6 @@ import ( "time" "github.com/go-kit/kit/log/level" - "github.com/golang/protobuf/proto" - "github.com/golang/snappy" consul "github.com/hashicorp/consul/api" cleanhttp "github.com/hashicorp/go-cleanhttp" @@ -39,25 +37,6 @@ func (cfg *ConsulConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ConsistentReads, "consul.consistent-reads", true, "Enable consistent reads to consul.") } -// KVClient is a high-level client for Consul, that exposes operations -// such as CAS and Watch which take callbacks. It also deals with serialisation -// by having an instance factory passed in to methods and deserialising into that. -type KVClient interface { - CAS(ctx context.Context, key string, f CASCallback) error - WatchKey(ctx context.Context, key string, f func(interface{}) bool) - Get(ctx context.Context, key string) (interface{}, error) - PutBytes(ctx context.Context, key string, buf []byte) error -} - -// CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. -type CASCallback func(in interface{}) (out interface{}, retry bool, err error) - -// Codec allows the consult client to serialise and deserialise values. -type Codec interface { - Decode([]byte) (interface{}, error) - Encode(interface{}) ([]byte, error) -} - type kv interface { CAS(p *consul.KVPair, q *consul.WriteOptions) (bool, *consul.WriteMeta, error) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consul.QueryMeta, error) @@ -104,33 +83,6 @@ var ( ErrNotFound = fmt.Errorf("Not found") ) -// ProtoCodec is a Codec for proto/snappy -type ProtoCodec struct { - Factory func() proto.Message -} - -// Decode implements Codec -func (p ProtoCodec) Decode(bytes []byte) (interface{}, error) { - out := p.Factory() - bytes, err := snappy.Decode(nil, bytes) - if err != nil { - return nil, err - } - if err := proto.Unmarshal(bytes, out); err != nil { - return nil, err - } - return out, nil -} - -// Encode implements Codec -func (p ProtoCodec) Encode(msg interface{}) ([]byte, error) { - bytes, err := proto.Marshal(msg.(proto.Message)) - if err != nil { - return nil, err - } - return snappy.Encode(nil, bytes), nil -} - // CAS atomically modifies a value in a callback. // If value doesn't exist you'll get nil as an argument to your callback. func (c *consulClient) CAS(ctx context.Context, key string, f CASCallback) error { diff --git a/pkg/ring/kvstore.go b/pkg/ring/kvstore.go new file mode 100644 index 00000000000..6961bf7efdc --- /dev/null +++ b/pkg/ring/kvstore.go @@ -0,0 +1,80 @@ +package ring + +import ( + "context" + "fmt" + "sync" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" +) + +var inmemoryStoreInit sync.Once +var inmemoryStore KVClient + +// KVClient is a high-level client for Consul, that exposes operations +// such as CAS and Watch which take callbacks. It also deals with serialisation +// by having an instance factory passed in to methods and deserialising into that. +type KVClient interface { + CAS(ctx context.Context, key string, f CASCallback) error + WatchKey(ctx context.Context, key string, f func(interface{}) bool) + Get(ctx context.Context, key string) (interface{}, error) + PutBytes(ctx context.Context, key string, buf []byte) error +} + +// CASCallback is the type of the callback to CAS. If err is nil, out must be non-nil. +type CASCallback func(in interface{}) (out interface{}, retry bool, err error) + +func newKVStore(cfg Config) (KVClient, error) { + if cfg.Mock != nil { + return cfg.Mock, nil + } + + switch cfg.Store { + case "consul": + codec := ProtoCodec{Factory: ProtoDescFactory} + return NewConsulClient(cfg.Consul, codec) + case "inmemory": + // If we use the in-memory store, make sure everyone gets the same instance + // within the same process. + inmemoryStoreInit.Do(func() { + inmemoryStore = NewInMemoryKVClient() + }) + return inmemoryStore, nil + default: + return nil, fmt.Errorf("invalid KV store type: %s", cfg.Store) + } +} + +// Codec allows the consult client to serialise and deserialise values. +type Codec interface { + Decode([]byte) (interface{}, error) + Encode(interface{}) ([]byte, error) +} + +// ProtoCodec is a Codec for proto/snappy +type ProtoCodec struct { + Factory func() proto.Message +} + +// Decode implements Codec +func (p ProtoCodec) Decode(bytes []byte) (interface{}, error) { + out := p.Factory() + bytes, err := snappy.Decode(nil, bytes) + if err != nil { + return nil, err + } + if err := proto.Unmarshal(bytes, out); err != nil { + return nil, err + } + return out, nil +} + +// Encode implements Codec +func (p ProtoCodec) Encode(msg interface{}) ([]byte, error) { + bytes, err := proto.Marshal(msg.(proto.Message)) + if err != nil { + return nil, err + } + return snappy.Encode(nil, bytes), nil +} diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 0978850587a..6a397b468c9 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -54,7 +54,6 @@ type LifecyclerConfig struct { InfNames []string ID string SkipUnregister bool - KVClient KVClient } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -116,16 +115,6 @@ type Lifecycler struct { // NewLifecycler makes and starts a new Lifecycler. func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Lifecycler, error) { - kvstore := cfg.KVClient - if kvstore == nil { - var err error - codec := ProtoCodec{Factory: ProtoDescFactory} - kvstore, err = NewConsulClient(cfg.RingConfig.Consul, codec) - if err != nil { - return nil, err - } - } - addr := cfg.Addr if addr == "" { var err error @@ -139,10 +128,15 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer) (*Life port = *cfg.ListenPort } + store, err := newKVStore(cfg.RingConfig) + if err != nil { + return nil, err + } + l := &Lifecycler{ cfg: cfg, flushTransferer: flushTransferer, - KVStore: kvstore, + KVStore: store, addr: fmt.Sprintf("%s:%d", addr, port), ID: cfg.ID, diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 0801e224cc2..718325a567a 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -39,7 +39,6 @@ func TestRingNormaliseMigration(t *testing.T) { lifecyclerConfig1.Addr = "0.0.0.0" lifecyclerConfig1.Port = 1 lifecyclerConfig1.RingConfig = ringConfig - lifecyclerConfig1.KVClient = ringConfig.Mock lifecyclerConfig1.NumTokens = 1 lifecyclerConfig1.ClaimOnRollout = true lifecyclerConfig1.ID = "ing1" diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 46e586aefc9..feb4880d06b 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -95,20 +95,9 @@ func New(cfg Config) (*Ring, error) { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } - store := cfg.Mock - if store == nil { - var err error - - switch cfg.Store { - case "consul": - codec := ProtoCodec{Factory: ProtoDescFactory} - store, err = NewConsulClient(cfg.Consul, codec) - case "inmemory": - store = NewInMemoryKVClient() - } - if err != nil { - return nil, err - } + store, err := newKVStore(cfg) + if err != nil { + return nil, err } r := &Ring{ From ff28de7545752c299091f26bff7879a9ea4b4c43 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 26 Nov 2018 09:23:22 +0100 Subject: [PATCH 6/8] All URLValues to be unmarshaled from YAML. Signed-off-by: Tom Wilkie --- pkg/util/flagext/url.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/util/flagext/url.go b/pkg/util/flagext/url.go index 6e978009642..9d470b5051d 100644 --- a/pkg/util/flagext/url.go +++ b/pkg/util/flagext/url.go @@ -24,3 +24,18 @@ func (v *URLValue) Set(s string) error { v.URL = u return nil } + +// UnmarshalYAML implements yaml.Unmarshaler. +func (v *URLValue) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + u, err := url.Parse(s) + if err != nil { + return err + } + v.URL = u + return nil +} From be3017a1ce788c394ffb487d345d2b8dfb639dfb Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 29 Nov 2018 18:57:59 +0100 Subject: [PATCH 7/8] Lint, build and test. Signed-off-by: Tom Wilkie --- cmd/lite/main.go | 3 +-- cmd/querier/main.go | 2 +- pkg/chunk/chunk_store_test.go | 32 ++++++++++++++++---------------- pkg/chunk/testutils/testutils.go | 4 ++-- pkg/util/flagext/strings.go | 1 + 5 files changed, 21 insertions(+), 21 deletions(-) diff --git a/cmd/lite/main.go b/cmd/lite/main.go index 7feb74c54e3..3c4c0aa16be 100644 --- a/cmd/lite/main.go +++ b/cmd/lite/main.go @@ -86,7 +86,6 @@ func main() { } prometheus.MustRegister(r) defer r.Stop() - ingesterConfig.LifecyclerConfig.KVClient = r.KVClient dist, err := distributor.New(distributorConfig, ingesterClientConfig, overrides, r) if err != nil { @@ -150,7 +149,7 @@ func main() { map[string]string{}, // TODO: include configuration flags func(f http.HandlerFunc) http.HandlerFunc { return f }, func() *tsdb.DB { return nil }, // Only needed for admin APIs. - false, // Disable admin APIs. + false, // Disable admin APIs. util.Logger, querier.DummyRulesRetriever{}, ) diff --git a/cmd/querier/main.go b/cmd/querier/main.go index 8b868e49e78..f69511c9b34 100644 --- a/cmd/querier/main.go +++ b/cmd/querier/main.go @@ -115,7 +115,7 @@ func main() { map[string]string{}, // TODO: include configuration flags func(f http.HandlerFunc) http.HandlerFunc { return f }, func() *tsdb.DB { return nil }, // Only needed for admin APIs. - false, // Disable admin APIs. + false, // Disable admin APIs. util.Logger, querier.DummyRulesRetriever{}, ) diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index f3d55392a3e..c6e1e824653 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -135,25 +135,25 @@ func TestChunkStore_Get(t *testing.T) { fooMetric1 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", } fooMetric2 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", } // barMetric1 is a subset of barMetric2 to test over-matching bug. barMetric1 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", + "bar": "baz", } barMetric2 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } fooChunk1 := dummyChunkFor(now, fooMetric1) @@ -315,14 +315,14 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { now := model.Now() chunk1 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", }) chunk2 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", }) for _, tc := range []struct { @@ -424,7 +424,7 @@ func TestChunkStoreRandom(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -488,7 +488,7 @@ func TestChunkStoreLeastRead(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -534,7 +534,7 @@ func TestIndexCachingWorks(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) metric := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", } storeMaker := stores[1] storeCfg := storeMaker.configFn() diff --git a/pkg/chunk/testutils/testutils.go b/pkg/chunk/testutils/testutils.go index 6ad32a62a22..dc2a7f9b7d6 100644 --- a/pkg/chunk/testutils/testutils.go +++ b/pkg/chunk/testutils/testutils.go @@ -70,8 +70,8 @@ func CreateChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { func dummyChunk(now model.Time) chunk.Chunk { return dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", }) } diff --git a/pkg/util/flagext/strings.go b/pkg/util/flagext/strings.go index 08242cd161f..f70cb182dc8 100644 --- a/pkg/util/flagext/strings.go +++ b/pkg/util/flagext/strings.go @@ -4,6 +4,7 @@ import ( "fmt" ) +// Strings is a list of strings. type Strings []string // String implements flag.Value From 6c6d089138917b241e09bc7063a257f71bcb6185 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 4 Dec 2018 14:01:35 +0000 Subject: [PATCH 8/8] Allow setting interface names via yaml. Signed-off-by: Tom Wilkie --- pkg/ring/lifecycler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 6a397b468c9..e165d317d54 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -47,11 +47,11 @@ type LifecyclerConfig struct { JoinAfter time.Duration `yaml:"join_after,omitempty"` ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"` NormaliseTokens bool `yaml:"normalise_tokens,omitempty"` + InfNames []string `yaml:"interface_names"` // For testing, you can override the address and ID of this ingester Addr string Port int - InfNames []string ID string SkipUnregister bool }