diff --git a/.gitignore b/.gitignore index 6984fdaed6a..ab8bb3d8363 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ -cmd/cortex/cortex -cmd/cortex_table_manager/cortex_table_manager +cmd/distributor/distributor +cmd/ingester/ingester +cmd/querier/querier +cmd/ruler/ruler +cmd/table-manager/table-manager .uptodate .pkg *.pb.go diff --git a/Makefile b/Makefile index 01a06a7c1a2..55795c3f9dc 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ # Boiler plate for bulding Docker containers. # All this must go at top of file I'm afraid. -IMAGE_PREFIX := weaveworks +IMAGE_PREFIX := weaveworks/cortex- IMAGE_TAG := $(shell ./tools/image-tag) UPTODATE := .uptodate @@ -12,40 +12,43 @@ UPTODATE := .uptodate # Dependencies (i.e. things that go in the image) still need to be explicitly # declared. %/$(UPTODATE): %/Dockerfile - $(SUDO) docker build -t $(IMAGE_PREFIX)/$(shell basename $(@D)) $(@D)/ - $(SUDO) docker tag $(IMAGE_PREFIX)/$(shell basename $(@D)) $(IMAGE_PREFIX)/$(shell basename $(@D)):$(IMAGE_TAG) + $(SUDO) docker build -t $(IMAGE_PREFIX)$(shell basename $(@D)) $(@D)/ + $(SUDO) docker tag $(IMAGE_PREFIX)$(shell basename $(@D)) $(IMAGE_PREFIX)$(shell basename $(@D)):$(IMAGE_TAG) touch $@ # Get a list of directories containing Dockerfiles DOCKERFILES := $(shell find . -type f -name Dockerfile ! -path "./tools/*" ! -path "./vendor/*") UPTODATE_FILES := $(patsubst %/Dockerfile,%/$(UPTODATE),$(DOCKERFILES)) -DOCKER_IMAGE_DIRS=$(patsubst %/Dockerfile,%,$(DOCKERFILES)) -IMAGE_NAMES=$(foreach dir,$(DOCKER_IMAGE_DIRS),$(patsubst %,$(IMAGE_PREFIX)/%,$(shell basename $(dir)))) +DOCKER_IMAGE_DIRS := $(patsubst %/Dockerfile,%,$(DOCKERFILES)) +IMAGE_NAMES := $(foreach dir,$(DOCKER_IMAGE_DIRS),$(patsubst %,$(IMAGE_PREFIX)%,$(shell basename $(dir)))) images: $(info $(IMAGE_NAMES)) + @echo > /dev/null +# Generating proto code is automated. PROTO_DEFS := $(shell find . -type f -name "*.proto" ! -path "./tools/*" ! -path "./vendor/*") PROTO_GOS := $(patsubst %.proto,%.pb.go,$(PROTO_DEFS)) -# List of exes please -CORTEX_EXE := ./cmd/cortex/cortex -CORTEX_TABLE_MANAGER_EXE := ./cmd/cortex_table_manager/cortex_table_manager -EXES = $(CORTEX_EXE) $(CORTEX_TABLE_MANAGER_EXE) - +# Building binaries is now automated. The convention is to build a binary +# for every directory with main.go in it, in the ./cmd directory. +MAIN_GO := $(shell find ./cmd -type f -name main.go ! -path "./tools/*" ! -path "./vendor/*") +EXES := $(foreach exe, $(patsubst ./cmd/%/main.go, %, $(MAIN_GO)), ./cmd/$(exe)/$(exe)) +GO_FILES := $(shell find . -name '*.go' ! -path "./cmd/*" ! -path "./tools/*" ! -path "./vendor/*") +define dep_exe +$(1): $(dir $(1))/main.go $(GO_FILES) ui/bindata.go $(PROTO_GOS) +$(dir $(1))$(UPTODATE): $(1) +endef +$(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe)))) + +# Manually declared dependancies And what goes into each exe +%.pb.go: %.proto all: $(UPTODATE_FILES) test: $(PROTO_GOS) - -# And what goes into each exe -$(CORTEX_EXE): $(shell find . -name '*.go' ! -path "./tools/*" ! -path "./vendor/*") ui/bindata.go $(PROTO_GOS) -$(CORTEX_TABLE_MANAGER_EXE): $(shell find ./chunk/ -name '*.go') cmd/cortex_table_manager/main.go -%.pb.go: %.proto ui/bindata.go: $(shell find ui/static ui/templates) test: $(PROTO_GOS) # And now what goes into each image -cortex-build/$(UPTODATE): cortex-build/* -cmd/cortex/$(UPTODATE): $(CORTEX_EXE) -cmd/cortex_table_manager/$(UPTODATE): $(CORTEX_TABLE_MANAGER_EXE) +build/$(UPTODATE): build/* # All the boiler plate for building golang follows: SUDO := $(shell docker info >/dev/null 2>&1 || echo "sudo -E") @@ -63,32 +66,32 @@ NETGO_CHECK = @strings $@ | grep cgo_stub\\\.go >/dev/null || { \ ifeq ($(BUILD_IN_CONTAINER),true) -$(EXES) $(PROTO_GOS) ui/bindata.go lint test shell: cortex-build/$(UPTODATE) +$(EXES) $(PROTO_GOS) ui/bindata.go lint test shell: build/$(UPTODATE) @mkdir -p $(shell pwd)/.pkg $(SUDO) docker run $(RM) -ti \ -v $(shell pwd)/.pkg:/go/pkg \ -v $(shell pwd):/go/src/github.com/weaveworks/cortex \ - $(IMAGE_PREFIX)/cortex-build $@ + $(IMAGE_PREFIX)build $@ else -$(EXES): cortex-build/$(UPTODATE) +$(EXES): build/$(UPTODATE) go build $(GO_FLAGS) -o $@ ./$(@D) $(NETGO_CHECK) -%.pb.go: cortex-build/$(UPTODATE) +%.pb.go: build/$(UPTODATE) protoc -I ./vendor:./$(@D) --go_out=plugins=grpc:./$(@D) ./$(patsubst %.pb.go,%.proto,$@) -ui/bindata.go: cortex-build/$(UPTODATE) +ui/bindata.go: build/$(UPTODATE) go-bindata -pkg ui -o ui/bindata.go -ignore '(.*\.map|bootstrap\.js|bootstrap-theme\.css|bootstrap\.css)' ui/templates/... ui/static/... -lint: cortex-build/$(UPTODATE) +lint: build/$(UPTODATE) ./tools/lint -notestpackage -ignorespelling queriers -ignorespelling Queriers . -test: cortex-build/$(UPTODATE) +test: build/$(UPTODATE) ./tools/test -netgo -shell: cortex-build/$(UPTODATE) +shell: build/$(UPTODATE) bash endif diff --git a/cortex-build/Dockerfile b/build/Dockerfile similarity index 100% rename from cortex-build/Dockerfile rename to build/Dockerfile diff --git a/cortex-build/build.sh b/build/build.sh similarity index 100% rename from cortex-build/build.sh rename to build/build.sh diff --git a/chunk/chunk_cache.go b/chunk/chunk_cache.go index 38c19446e42..514c2032f29 100644 --- a/chunk/chunk_cache.go +++ b/chunk/chunk_cache.go @@ -2,6 +2,7 @@ package chunk import ( "bytes" + "flag" "fmt" "io/ioutil" "time" @@ -47,10 +48,34 @@ type Memcache interface { Set(item *memcache.Item) error } +// CacheConfig is config to make a Cache +type CacheConfig struct { + Expiration time.Duration + memcacheConfig MemcacheConfig +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") + cfg.memcacheConfig.RegisterFlags(f) +} + // Cache type caches chunks type Cache struct { - Memcache Memcache - Expiration time.Duration + cfg CacheConfig + memcache Memcache +} + +// NewCache makes a new Cache +func NewCache(cfg CacheConfig) *Cache { + var memcache Memcache + if cfg.memcacheConfig.Host != "" { + memcache = NewMemcacheClient(cfg.memcacheConfig) + } + return &Cache{ + cfg: cfg, + memcache: memcache, + } } func memcacheStatusCode(err error) string { @@ -73,6 +98,10 @@ func memcacheKey(userID, chunkID string) string { // FetchChunkData gets chunks from the chunk cache. func (c *Cache) FetchChunkData(ctx context.Context, userID string, chunks []Chunk) (found []Chunk, missing []Chunk, err error) { + if c.memcache == nil { + return nil, chunks, nil + } + memcacheRequests.Add(float64(len(chunks))) keys := make([]string, 0, len(chunks)) @@ -83,7 +112,7 @@ func (c *Cache) FetchChunkData(ctx context.Context, userID string, chunks []Chun var items map[string]*memcache.Item err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { var err error - items, err = c.Memcache.GetMulti(keys) + items, err = c.memcache.GetMulti(keys) return err }) if err != nil { @@ -111,6 +140,10 @@ func (c *Cache) FetchChunkData(ctx context.Context, userID string, chunks []Chun // StoreChunkData serializes and stores a chunk in the chunk cache. func (c *Cache) StoreChunkData(ctx context.Context, userID string, chunk *Chunk) error { + if c.memcache == nil { + return nil + } + reader, err := chunk.reader() if err != nil { return err @@ -125,9 +158,9 @@ func (c *Cache) StoreChunkData(ctx context.Context, userID string, chunk *Chunk) item := memcache.Item{ Key: memcacheKey(userID, chunk.ID), Value: buf, - Expiration: int32(c.Expiration.Seconds()), + Expiration: int32(c.cfg.Expiration.Seconds()), } - return c.Memcache.Set(&item) + return c.memcache.Set(&item) }) } diff --git a/chunk/chunk_store.go b/chunk/chunk_store.go index f69a5fe8f47..e67c664a244 100644 --- a/chunk/chunk_store.go +++ b/chunk/chunk_store.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/base64" "encoding/json" + "flag" "fmt" "sort" "strconv" @@ -21,6 +22,7 @@ import ( "golang.org/x/net/context" "github.com/weaveworks/cortex/user" + "github.com/weaveworks/cortex/util" ) const ( @@ -89,20 +91,28 @@ type Store interface { // StoreConfig specifies config for a ChunkStore type StoreConfig struct { - S3 S3Client - BucketName string - DynamoDB DynamoDBClient - TableName string - ChunkCache *Cache + PeriodicTableConfig + CacheConfig + S3 S3ClientValue + DynamoDB DynamoDBClientValue // After midnight on this day, we start bucketing indexes by day instead of by // hour. Only the day matters, not the time within the day. - DailyBucketsFrom model.Time + DailyBucketsFrom util.DayValue // After this time, we will only query for base64-encoded label values. - Base64ValuesFrom model.Time + Base64ValuesFrom util.DayValue +} - PeriodicTableConfig +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { + cfg.PeriodicTableConfig.RegisterFlags(f) + cfg.CacheConfig.RegisterFlags(f) + + f.Var(&cfg.S3, "s3.url", "S3 endpoint URL.") + f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL.") + f.Var(&cfg.DailyBucketsFrom, "dynamodb.daily-buckets-from", "The date in the format YYYY-MM-DD of the first day for which DynamoDB index buckets should be day-sized vs. hour-sized.") + f.Var(&cfg.Base64ValuesFrom, "dynamodb.base64-buckets-from", "The date in the format YYYY-MM-DD after which we will stop querying to non-base64 encoded values.") } // PeriodicTableConfig for the use of periodic tables (ie, weekly talbes). Can @@ -112,13 +122,21 @@ type PeriodicTableConfig struct { UsePeriodicTables bool TablePrefix string TablePeriod time.Duration - PeriodicTableStartAt time.Time + PeriodicTableStartAt util.DayValue +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *PeriodicTableConfig) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.UsePeriodicTables, "dynamodb.use-periodic-tables", true, "Should we user periodic tables.") + f.StringVar(&cfg.TablePrefix, "dynamodb.periodic-table.prefix", "cortex_", "DynamoDB table prefix for the periodic tables.") + f.DurationVar(&cfg.TablePeriod, "dynamodb.periodic-table.period", 7*24*time.Hour, "DynamoDB periodic tables period.") + f.Var(&cfg.PeriodicTableStartAt, "dynamodb.periodic-table.start", "DynamoDB periodic tables start time.") } // AWSStore implements ChunkStore for AWS type AWSStore struct { - cfg StoreConfig - + cfg StoreConfig + cache *Cache dynamo *dynamoDBBackoffClient } @@ -126,6 +144,7 @@ type AWSStore struct { func NewAWSStore(cfg StoreConfig) *AWSStore { return &AWSStore{ cfg: cfg, + cache: NewCache(cfg.CacheConfig), dynamo: newDynamoDBBackoffClient(cfg.DynamoDB), } } @@ -184,7 +203,7 @@ func (c *AWSStore) bigBuckets(from, through model.Time) []bucketSpec { func (c *AWSStore) tableForBucket(bucketStart int64) string { if !c.cfg.UsePeriodicTables || bucketStart < (c.cfg.PeriodicTableStartAt.Unix()) { - return c.cfg.TableName + return c.cfg.DynamoDB.TableName } return c.cfg.TablePrefix + strconv.Itoa(int(bucketStart/int64(c.cfg.TablePeriod/time.Second))) } @@ -356,7 +375,7 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er var err error _, err = c.cfg.S3.PutObject(&s3.PutObjectInput{ Body: body, - Bucket: aws.String(c.cfg.BucketName), + Bucket: aws.String(c.cfg.S3.BucketName), Key: aws.String(chunkName(userID, chunk.ID)), }) return err @@ -365,10 +384,8 @@ func (c *AWSStore) putChunk(ctx context.Context, userID string, chunk *Chunk) er return err } - if c.cfg.ChunkCache != nil { - if err = c.cfg.ChunkCache.StoreChunkData(ctx, userID, chunk); err != nil { - log.Warnf("Could not store %v in chunk cache: %v", chunk.ID, err) - } + if err = c.cache.StoreChunkData(ctx, userID, chunk); err != nil { + log.Warnf("Could not store %v in chunk cache: %v", chunk.ID, err) } return nil } @@ -433,11 +450,9 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers . queryChunks.Observe(float64(len(missing))) var fromCache []Chunk - if c.cfg.ChunkCache != nil { - fromCache, missing, err = c.cfg.ChunkCache.FetchChunkData(ctx, userID, missing) - if err != nil { - log.Warnf("Error fetching from cache: %v", err) - } + fromCache, missing, err = c.cache.FetchChunkData(ctx, userID, missing) + if err != nil { + log.Warnf("Error fetching from cache: %v", err) } fromS3, err := c.fetchChunkData(ctx, userID, missing) @@ -445,10 +460,8 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers . return nil, err } - if c.cfg.ChunkCache != nil { - if err = c.cfg.ChunkCache.StoreChunks(ctx, userID, fromS3); err != nil { - log.Warnf("Could not store chunks in chunk cache: %v", err) - } + if err = c.cache.StoreChunks(ctx, userID, fromS3); err != nil { + log.Warnf("Could not store chunks in chunk cache: %v", err) } // TODO instead of doing this sort, propagate an index and assign chunks @@ -611,7 +624,7 @@ func (c *AWSStore) lookupChunksForMatcher(ctx context.Context, userID string, bu return nil, err } - if matcher.Type == metric.Equal && bucket.startTime.Before(c.cfg.Base64ValuesFrom) { + if matcher.Type == metric.Equal && bucket.startTime.Before(c.cfg.Base64ValuesFrom.Time) { legacyRangePrefix, err := rangeValueKeyAndValueOnly(matcher.Name, matcher.Value) if err != nil { return nil, err @@ -715,7 +728,7 @@ func (c *AWSStore) fetchChunkData(ctx context.Context, userID string, chunkSet [ err := instrument.TimeRequestHistogram(ctx, "S3.GetObject", s3RequestDuration, func(_ context.Context) error { var err error resp, err = c.cfg.S3.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(c.cfg.BucketName), + Bucket: aws.String(c.cfg.S3.BucketName), Key: aws.String(chunkName(userID, chunk.ID)), }) return err diff --git a/chunk/chunk_store_test.go b/chunk/chunk_store_test.go index 99991e2b32a..517a19f846d 100644 --- a/chunk/chunk_store_test.go +++ b/chunk/chunk_store_test.go @@ -16,6 +16,7 @@ import ( "golang.org/x/net/context" "github.com/weaveworks/cortex/user" + "github.com/weaveworks/cortex/util" ) func init() { @@ -24,7 +25,9 @@ func init() { func setupDynamodb(t *testing.T, dynamoDB DynamoDBClient) { tableManager, err := NewDynamoTableManager(TableManagerConfig{ - DynamoDB: dynamoDB, + DynamoDB: DynamoDBClientValue{ + DynamoDBClient: dynamoDB, + }, }) if err != nil { t.Fatal(err) @@ -38,8 +41,12 @@ func TestChunkStoreUnprocessed(t *testing.T) { dynamoDB := NewMockDynamoDB(2, 2) setupDynamodb(t, dynamoDB) store := NewAWSStore(StoreConfig{ - DynamoDB: dynamoDB, - S3: NewMockS3(), + DynamoDB: DynamoDBClientValue{ + DynamoDBClient: dynamoDB, + }, + S3: S3ClientValue{ + S3Client: NewMockS3(), + }, }) ctx := user.WithID(context.Background(), "0") @@ -73,8 +80,12 @@ func TestChunkStore(t *testing.T) { dynamoDB := NewMockDynamoDB(0, 0) setupDynamodb(t, dynamoDB) store := NewAWSStore(StoreConfig{ - DynamoDB: dynamoDB, - S3: NewMockS3(), + DynamoDB: DynamoDBClientValue{ + DynamoDBClient: dynamoDB, + }, + S3: S3ClientValue{ + S3Client: NewMockS3(), + }, }) ctx := user.WithID(context.Background(), "0") @@ -181,8 +192,9 @@ func TestBigBuckets(t *testing.T) { scenarios := []struct { from, through, dailyBucketsFrom model.Time - periodicTablesFrom time.Time + periodicTablesFrom model.Time periodicTablesPeriod time.Duration + usePeriodicTables bool buckets []bucketSpec }{ // Buckets are by hour until we reach the `dailyBucketsFrom`, after which they are by day. @@ -230,7 +242,8 @@ func TestBigBuckets(t *testing.T) { from: model.TimeFromUnix(0), through: model.TimeFromUnix(0).Add(4*24*time.Hour) - 1, dailyBucketsFrom: model.TimeFromUnix(0), - periodicTablesFrom: time.Unix(0, 0), + usePeriodicTables: true, + periodicTablesFrom: model.TimeFromUnix(0), periodicTablesPeriod: 2 * 24 * time.Hour, buckets: mergeBuckets( buckets(periodicPrefix+"0", []string{"d0", "d1"}), @@ -243,7 +256,8 @@ func TestBigBuckets(t *testing.T) { from: model.TimeFromUnix(0), through: model.TimeFromUnix(0).Add(4*24*time.Hour) - 1, dailyBucketsFrom: model.TimeFromUnix(0).Add(2*24*time.Hour) - 1, - periodicTablesFrom: time.Unix(0, 0).Add(1 * 24 * time.Hour), + usePeriodicTables: true, + periodicTablesFrom: model.TimeFromUnix(0).Add(1 * 24 * time.Hour), periodicTablesPeriod: 2 * 24 * time.Hour, buckets: mergeBuckets( buckets(tableName, firstDayBuckets), @@ -254,15 +268,17 @@ func TestBigBuckets(t *testing.T) { } for i, s := range scenarios { t.Run(fmt.Sprintf("Case %d", i), func(t *testing.T) { - cs := &AWSStore{ + cs := AWSStore{ cfg: StoreConfig{ - TableName: tableName, - DailyBucketsFrom: s.dailyBucketsFrom, + DynamoDB: DynamoDBClientValue{ + TableName: tableName, + }, + DailyBucketsFrom: util.DayValue{s.dailyBucketsFrom}, PeriodicTableConfig: PeriodicTableConfig{ - UsePeriodicTables: !s.periodicTablesFrom.IsZero(), + UsePeriodicTables: s.usePeriodicTables, TablePeriod: s.periodicTablesPeriod, TablePrefix: periodicPrefix, - PeriodicTableStartAt: s.periodicTablesFrom, + PeriodicTableStartAt: util.DayValue{s.periodicTablesFrom}, }, }, } diff --git a/chunk/dynamo_table_manager.go b/chunk/dynamo_table_manager.go index 895b0a574c7..98e11ef1718 100644 --- a/chunk/dynamo_table_manager.go +++ b/chunk/dynamo_table_manager.go @@ -1,6 +1,7 @@ package chunk import ( + "flag" "sort" "strconv" "sync" @@ -41,8 +42,7 @@ func init() { // TableManagerConfig is the config for a DynamoTableManager type TableManagerConfig struct { - DynamoDB DynamoDBClient - TableName string + DynamoDB DynamoDBClientValue DynamoDBPollInterval time.Duration PeriodicTableConfig @@ -54,6 +54,17 @@ type TableManagerConfig struct { ProvisionedReadThroughput int64 } +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *TableManagerConfig) RegisterFlags(f *flag.FlagSet) { + f.Var(&cfg.DynamoDB, "dynamodb.url", "DynamoDB endpoint URL.") + f.DurationVar(&cfg.DynamoDBPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") + f.DurationVar(&cfg.CreationGracePeriod, "dynamodb.periodic-table.grace-period", 10*time.Minute, "DynamoDB periodic tables grace period (duration which table will be created/deleted before/after it's needed).") + f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.") + f.Int64Var(&cfg.ProvisionedWriteThroughput, "dynamodb.periodic-table.write-throughput", 3000, "DynamoDB periodic tables write throughput") + f.Int64Var(&cfg.ProvisionedReadThroughput, "dynamodb.periodic-table.read-throughput", 300, "DynamoDB periodic tables read throughput") + cfg.PeriodicTableConfig.RegisterFlags(f) +} + // DynamoTableManager creates and manages the provisioned throughput on DynamoDB tables type DynamoTableManager struct { cfg TableManagerConfig @@ -140,7 +151,7 @@ func (m *DynamoTableManager) calculateExpectedTables() []tableDescription { if !m.cfg.UsePeriodicTables { return []tableDescription{ { - name: m.cfg.TableName, + name: m.cfg.DynamoDB.TableName, provisionedRead: m.cfg.ProvisionedReadThroughput, provisionedWrite: m.cfg.ProvisionedWriteThroughput, }, @@ -161,7 +172,7 @@ func (m *DynamoTableManager) calculateExpectedTables() []tableDescription { // Add the legacy table { legacyTable := tableDescription{ - name: m.cfg.TableName, + name: m.cfg.DynamoDB.TableName, provisionedRead: m.cfg.ProvisionedReadThroughput, provisionedWrite: minWriteCapacity, } diff --git a/chunk/dynamo_table_manager_test.go b/chunk/dynamo_table_manager_test.go index e6b45baedab..cd2a27e433a 100644 --- a/chunk/dynamo_table_manager_test.go +++ b/chunk/dynamo_table_manager_test.go @@ -7,8 +7,11 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/prometheus/common/model" "github.com/weaveworks/common/mtime" "golang.org/x/net/context" + + "github.com/weaveworks/cortex/util" ) const ( @@ -24,13 +27,15 @@ func TestDynamoTableManager(t *testing.T) { dynamoDB := NewMockDynamoDB(0, 0) cfg := TableManagerConfig{ - DynamoDB: dynamoDB, + DynamoDB: DynamoDBClientValue{ + DynamoDBClient: dynamoDB, + }, PeriodicTableConfig: PeriodicTableConfig{ UsePeriodicTables: true, TablePrefix: tablePrefix, TablePeriod: tablePeriod, - PeriodicTableStartAt: time.Unix(0, 0), + PeriodicTableStartAt: util.DayValue{model.TimeFromUnix(0)}, }, CreationGracePeriod: gracePeriod, diff --git a/chunk/dynamodb_client.go b/chunk/dynamodb_client.go index d70704ba267..3493e4334de 100644 --- a/chunk/dynamodb_client.go +++ b/chunk/dynamodb_client.go @@ -114,6 +114,24 @@ func NewDynamoDBClient(dynamoDBURL string) (DynamoDBClient, string, error) { return dynamoDBClient, tableName, nil } +// DynamoDBClientValue is a flag.Value that parses a URL and produces a DynamoDBClient +type DynamoDBClientValue struct { + url, TableName string + DynamoDBClient +} + +// String implements flag.Value +func (c *DynamoDBClientValue) String() string { + return c.url +} + +// Set implements flag.Value +func (c *DynamoDBClientValue) Set(v string) error { + var err error + c.DynamoDBClient, c.TableName, err = NewDynamoDBClient(v) + return err +} + type dynamoClientAdapter struct { *dynamodb.DynamoDB } diff --git a/chunk/memcache_client.go b/chunk/memcache_client.go index e4da4f61b6b..1b5b976d4a1 100644 --- a/chunk/memcache_client.go +++ b/chunk/memcache_client.go @@ -1,6 +1,7 @@ package chunk import ( + "flag" "fmt" "net" "sort" @@ -31,27 +32,35 @@ type MemcacheConfig struct { UpdateInterval time.Duration } +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *MemcacheConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Host, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") + f.StringVar(&cfg.Service, "memcached.service", "memcached", "SRV service used to discover memcache servers.") + f.DurationVar(&cfg.Timeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") + f.DurationVar(&cfg.UpdateInterval, "memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.") +} + // NewMemcacheClient creates a new MemcacheClient that gets its server list // from SRV and updates the server list on a regular basis. -func NewMemcacheClient(config MemcacheConfig) *MemcacheClient { +func NewMemcacheClient(cfg MemcacheConfig) *MemcacheClient { var servers memcache.ServerList client := memcache.NewFromSelector(&servers) - client.Timeout = config.Timeout + client.Timeout = cfg.Timeout newClient := &MemcacheClient{ Client: client, serverList: &servers, - hostname: config.Host, - service: config.Service, + hostname: cfg.Host, + service: cfg.Service, quit: make(chan struct{}), } err := newClient.updateMemcacheServers() if err != nil { - log.Errorf("Error setting memcache servers to '%v': %v", config.Host, err) + log.Errorf("Error setting memcache servers to '%v': %v", cfg.Host, err) } newClient.wait.Add(1) - go newClient.updateLoop(config.UpdateInterval) + go newClient.updateLoop(cfg.UpdateInterval) return newClient } diff --git a/chunk/s3_client.go b/chunk/s3_client.go index cc665ed1995..8b5df5ad749 100644 --- a/chunk/s3_client.go +++ b/chunk/s3_client.go @@ -51,3 +51,21 @@ func awsConfigFromURL(url *url.URL) (*aws.Config, error) { } return config, nil } + +// S3ClientValue is a flag.Value that parses a URL and produces a S3Client +type S3ClientValue struct { + url, BucketName string + S3Client +} + +// String implements flag.Value +func (c *S3ClientValue) String() string { + return c.url +} + +// Set implements flag.Value +func (c *S3ClientValue) Set(v string) error { + var err error + c.S3Client, c.BucketName, err = NewS3Client(v) + return err +} diff --git a/circle.yml b/circle.yml index 6ec7a817b15..6525adcede9 100644 --- a/circle.yml +++ b/circle.yml @@ -9,7 +9,7 @@ dependencies: - "~/docker" override: - | - cd cortex-build && \ + cd build && \ ../tools/rebuild-image weaveworks/cortex-build . build.sh Dockerfile && \ touch .uptodate @@ -24,10 +24,5 @@ deployment: branch: master commands: - docker login -e "$DOCKER_REGISTRY_EMAIL" -u "$DOCKER_REGISTRY_USER" -p "$DOCKER_REGISTRY_PASSWORD" - - docker push weaveworks/cortex:$(./tools/image-tag) - - docker push weaveworks/cortex_table_manager:$(./tools/image-tag) - docker login -e '.' -u "$QUAY_USER" -p "$QUAY_PASSWORD" quay.io - - docker tag weaveworks/cortex:$(./tools/image-tag) quay.io/weaveworks/cortex:$(./tools/image-tag) - - docker tag weaveworks/cortex_table_manager:$(./tools/image-tag) quay.io/weaveworks/cortex_table_manager:$(./tools/image-tag) - - docker push quay.io/weaveworks/cortex:$(./tools/image-tag) - - docker push quay.io/weaveworks/cortex_table_manager:$(./tools/image-tag) + - ./push-images diff --git a/cmd/cortex/Dockerfile b/cmd/cortex/Dockerfile deleted file mode 100644 index 171344d2b33..00000000000 --- a/cmd/cortex/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM quay.io/prometheus/busybox:latest -COPY cortex /bin/cortex -EXPOSE 9094 -ENTRYPOINT [ "/bin/cortex" ] diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go deleted file mode 100644 index 211cec024a0..00000000000 --- a/cmd/cortex/main.go +++ /dev/null @@ -1,381 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "net" - "net/http" - _ "net/http/pprof" - "net/url" - "os" - "os/signal" - "syscall" - "time" - - "github.com/gorilla/mux" - "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" - "github.com/mwitkow/go-grpc-middleware" - "github.com/opentracing-contrib/go-stdlib/nethttp" - "github.com/opentracing/opentracing-go" - "github.com/weaveworks/scope/common/middleware" - "golang.org/x/net/context" - "google.golang.org/grpc" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/web/api/v1" - - "github.com/weaveworks/cortex" - "github.com/weaveworks/cortex/chunk" - "github.com/weaveworks/cortex/distributor" - "github.com/weaveworks/cortex/ingester" - "github.com/weaveworks/cortex/querier" - "github.com/weaveworks/cortex/ring" - "github.com/weaveworks/cortex/ruler" - "github.com/weaveworks/cortex/ui" - "github.com/weaveworks/cortex/user" - cortex_grpc_middleware "github.com/weaveworks/cortex/util/middleware" -) - -const ( - modeDistributor = "distributor" - modeIngester = "ingester" - modeRuler = "ruler" - - infName = "eth0" -) - -var ( - requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "request_duration_seconds", - Help: "Time (in seconds) spent serving HTTP requests.", - Buckets: prometheus.DefBuckets, - }, []string{"method", "route", "status_code", "ws"}) -) - -func init() { - prometheus.MustRegister(requestDuration) -} - -type cfg struct { - mode string - listenPort int - consulHost string - consulPrefix string - s3URL string - - dynamodbURL string - dynamodbCreateTables bool - dynamodbPollInterval time.Duration - dynamodbDailyBucketsFrom string - dynamodbPeriodicTableStartAt string - dynamodbTablePrefix string - dynamodbTablePeriod time.Duration - dynamodbBase64ValuesFrom string - - memcachedHostname string - memcachedTimeout time.Duration - memcachedExpiration time.Duration - memcachedService string - remoteTimeout time.Duration - numTokens int - logSuccess bool - watchDynamo bool - - ingesterConfig ingester.Config - distributorConfig distributor.Config - rulerConfig ruler.Config -} - -func main() { - var cfg cfg - flag.StringVar(&cfg.mode, "mode", modeDistributor, "Mode (distributor, ingester, ruler).") - flag.IntVar(&cfg.listenPort, "web.listen-port", 9094, "HTTP server listen port.") - flag.BoolVar(&cfg.logSuccess, "log.success", false, "Log successful requests") - - flag.StringVar(&cfg.consulHost, "consul.hostname", "localhost:8500", "Hostname and port of Consul.") - flag.StringVar(&cfg.consulPrefix, "consul.prefix", "collectors/", "Prefix for keys in Consul.") - - flag.StringVar(&cfg.s3URL, "s3.url", "localhost:4569", "S3 endpoint URL.") - flag.StringVar(&cfg.dynamodbURL, "dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.") - flag.DurationVar(&cfg.dynamodbPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") - flag.StringVar(&cfg.dynamodbDailyBucketsFrom, "dynamodb.daily-buckets-from", "9999-01-01", "The date in the format YYYY-MM-DD of the first day for which DynamoDB index buckets should be day-sized vs. hour-sized.") - flag.StringVar(&cfg.dynamodbPeriodicTableStartAt, "dynamodb.periodic-table.start", "", "DynamoDB periodic tables start time. If unspecified, don't use periodic tables.") - flag.StringVar(&cfg.dynamodbTablePrefix, "dynamodb.periodic-table.prefix", "cortex_", "DynamoDB table prefix for the periodic tables.") - flag.DurationVar(&cfg.dynamodbTablePeriod, "dynamodb.periodic-table.period", 7*24*time.Hour, "DynamoDB periodic tables period.") - flag.StringVar(&cfg.dynamodbBase64ValuesFrom, "dynamodb.base64-buckets-from", "9999-01-01", "The date in the format YYYY-MM-DD after which we will stop querying to non-base64 encoded values.") - - flag.StringVar(&cfg.memcachedHostname, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") - flag.StringVar(&cfg.memcachedService, "memcached.service", "memcached", "SRV service used to discover memcache servers.") - flag.DurationVar(&cfg.memcachedTimeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") - flag.DurationVar(&cfg.memcachedExpiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") - - flag.DurationVar(&cfg.ingesterConfig.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") - flag.DurationVar(&cfg.ingesterConfig.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") - flag.DurationVar(&cfg.ingesterConfig.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.") - flag.DurationVar(&cfg.ingesterConfig.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.") - flag.IntVar(&cfg.ingesterConfig.ConcurrentFlushes, "ingester.concurrent-flushes", ingester.DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.") - flag.IntVar(&cfg.numTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") - flag.IntVar(&cfg.ingesterConfig.GRPCListenPort, "ingester.grpc.listen-port", 9095, "gRPC server listen port.") - - flag.IntVar(&cfg.distributorConfig.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") - flag.IntVar(&cfg.distributorConfig.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.") - flag.DurationVar(&cfg.distributorConfig.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") - flag.DurationVar(&cfg.distributorConfig.RemoteTimeout, "distributor.remote-timeout", 5*time.Second, "Timeout for downstream ingesters.") - - flag.StringVar(&cfg.rulerConfig.ConfigsAPIURL, "ruler.configs.url", "", "URL of configs API server.") - flag.DurationVar(&cfg.rulerConfig.EvaluationInterval, "ruler.evaluation-interval", 15*time.Second, "How frequently to evaluate rules") - flag.DurationVar(&cfg.rulerConfig.ClientTimeout, "ruler.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.") - flag.IntVar(&cfg.rulerConfig.NumWorkers, "ruler.num-workers", 1, "Number of rule evaluator worker routines in this process") - - flag.Parse() - - chunkStore, err := setupChunkStore(cfg) - if err != nil { - log.Fatalf("Error initializing chunk store: %v", err) - } - if cfg.dynamodbPollInterval < 1*time.Minute { - log.Warnf("Polling DynamoDB more than once a minute. Likely to get throttled: %v", cfg.dynamodbPollInterval) - } - - ringCodec := ring.NewDynamicCodec( - ring.JSONCodec{Factory: ring.DescFactory}, - ring.ProtoCodec{Factory: ring.ProtoDescFactory}, - ) - consul, err := ring.NewConsulClient(cfg.consulHost, ringCodec) - if err != nil { - log.Fatalf("Error initializing Consul client: %v", err) - } - consul = ring.PrefixClient(consul, cfg.consulPrefix) - r := ring.New(consul, cfg.distributorConfig.HeartbeatTimeout) - defer r.Stop() - - router := mux.NewRouter() - router.Handle("/ring", r) - - switch cfg.mode { - case modeDistributor: - cfg.distributorConfig.Ring = r - setupDistributor(cfg.distributorConfig, chunkStore, router.PathPrefix("/api/prom").Subrouter()) - - case modeIngester: - cfg.ingesterConfig.Ring = r - registration, err := ring.RegisterIngester(consul, ring.IngesterRegistrationConfig{ - ListenPort: cfg.ingesterConfig.GRPCListenPort, - NumTokens: cfg.numTokens, - Codec: ringCodec, - }) - if err != nil { - // This only happens for errors in configuration & set-up, not for - // network errors. - log.Fatalf("Could not register ingester: %v", err) - } - ing := setupIngester(chunkStore, cfg.ingesterConfig, router) - - // Setup gRPC server - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", cfg.ingesterConfig.GRPCListenPort)) - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - grpcServer := grpc.NewServer( - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - cortex_grpc_middleware.ServerLoggingInterceptor(cfg.logSuccess), - cortex_grpc_middleware.ServerInstrumentInterceptor(requestDuration), - otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer()), - cortex_grpc_middleware.ServerUserHeaderInterceptor, - )), - ) - cortex.RegisterIngesterServer(grpcServer, ing) - go grpcServer.Serve(lis) - defer grpcServer.Stop() - - // Deferring a func to make ordering obvious - defer func() { - registration.ChangeState(ring.IngesterState_LEAVING) - ing.Stop() - registration.Unregister() - }() - - prometheus.MustRegister(registration) - - case modeRuler: - cfg.distributorConfig.Ring = r - ruler, err := setupRuler(cfg.distributorConfig, chunkStore, cfg.rulerConfig) - if err != nil { - // Some of our initial configuration was fundamentally invalid. - log.Fatalf("Could not set up ruler: %v", err) - } - go ruler.Run() - defer ruler.Stop() - - default: - log.Fatalf("Mode %s not supported!", cfg.mode) - } - - router.Handle("/metrics", prometheus.Handler()) - instrumented := middleware.Merge( - middleware.Func(func(handler http.Handler) http.Handler { - return nethttp.Middleware(opentracing.GlobalTracer(), handler) - }), - middleware.Log{ - LogSuccess: cfg.logSuccess, - }, - middleware.Instrument{ - Duration: requestDuration, - RouteMatcher: router, - }, - ).Wrap(router) - go http.ListenAndServe(fmt.Sprintf(":%d", cfg.listenPort), instrumented) - - term := make(chan os.Signal) - signal.Notify(term, os.Interrupt, syscall.SIGTERM) - <-term - log.Warn("Received SIGTERM, exiting gracefully...") -} - -func setupChunkStore(cfg cfg) (chunk.Store, error) { - var chunkCache *chunk.Cache - if cfg.memcachedHostname != "" { - chunkCache = &chunk.Cache{ - Memcache: chunk.NewMemcacheClient(chunk.MemcacheConfig{ - Host: cfg.memcachedHostname, - Service: cfg.memcachedService, - Timeout: cfg.memcachedTimeout, - UpdateInterval: 1 * time.Minute, - }), - Expiration: cfg.memcachedExpiration, - } - } - - s3Client, bucketName, err := chunk.NewS3Client(cfg.s3URL) - if err != nil { - return nil, err - } - - dynamoDBClient, tableName, err := chunk.NewDynamoDBClient(cfg.dynamodbURL) - if err != nil { - return nil, err - } - - dailyBucketsFrom, err := time.Parse("2006-01-02", cfg.dynamodbDailyBucketsFrom) - if err != nil { - return nil, fmt.Errorf("error parsing daily buckets begin date: %v", err) - } - - base64ValuesFrom, err := time.Parse("2006-01-02", cfg.dynamodbBase64ValuesFrom) - if err != nil { - return nil, fmt.Errorf("error parsing base64 values begin date: %v", err) - } - - usePeriodicTables, periodicTableStartAt := false, time.Time{} - if cfg.dynamodbPeriodicTableStartAt != "" { - usePeriodicTables = true - periodicTableStartAt, err = time.Parse(time.RFC3339, cfg.dynamodbPeriodicTableStartAt) - if err != nil { - log.Fatalf("Error parsing dynamodb.periodic-table.start: %v", err) - } - } - - return chunk.NewAWSStore(chunk.StoreConfig{ - S3: s3Client, - BucketName: bucketName, - DynamoDB: dynamoDBClient, - TableName: tableName, - ChunkCache: chunkCache, - - DailyBucketsFrom: model.TimeFromUnix(dailyBucketsFrom.Unix()), - Base64ValuesFrom: model.TimeFromUnix(base64ValuesFrom.Unix()), - - PeriodicTableConfig: chunk.PeriodicTableConfig{ - UsePeriodicTables: usePeriodicTables, - TablePrefix: cfg.dynamodbTablePrefix, - TablePeriod: cfg.dynamodbTablePeriod, - PeriodicTableStartAt: periodicTableStartAt, - }, - }), nil -} - -func setupDistributor( - cfg distributor.Config, - chunkStore chunk.Store, - router *mux.Router, -) { - dist, err := distributor.New(cfg) - if err != nil { - log.Fatal(err) - } - prometheus.MustRegister(dist) - - router.Path("/push").Handler(http.HandlerFunc(dist.PushHandler)) - - // TODO: Move querier to separate binary. - setupQuerier(dist, chunkStore, router) -} - -// setupQuerier sets up a complete querying pipeline: -// -// PromQL -> MergeQuerier -> Distributor -> IngesterQuerier -> Ingester -// | -// `----------> ChunkQuerier -> DynamoDB/S3 -func setupQuerier( - distributor *distributor.Distributor, - chunkStore chunk.Store, - router *mux.Router, -) { - queryable := querier.NewQueryable(distributor, chunkStore) - engine := promql.NewEngine(queryable, nil) - api := v1.NewAPI(engine, querier.DummyStorage{Queryable: queryable}) - promRouter := route.New(func(r *http.Request) (context.Context, error) { - userID := r.Header.Get(user.UserIDHeaderName) - if userID == "" { - return nil, fmt.Errorf("no %s header", user.UserIDHeaderName) - } - return user.WithID(r.Context(), userID), nil - }).WithPrefix("/api/prom/api/v1") - api.Register(promRouter) - router.PathPrefix("/api/v1").Handler(promRouter) - router.Path("/validate_expr").Handler(http.HandlerFunc(distributor.ValidateExprHandler)) - router.Path("/user_stats").Handler(http.HandlerFunc(distributor.UserStatsHandler)) - router.Path("/graph").Handler(ui.GraphHandler()) - router.PathPrefix("/static/").Handler(ui.StaticAssetsHandler("/api/prom/static/")) -} - -func setupIngester( - chunkStore chunk.Store, - cfg ingester.Config, - router *mux.Router, -) *ingester.Ingester { - ingester, err := ingester.New(cfg, chunkStore) - if err != nil { - log.Fatal(err) - } - prometheus.MustRegister(ingester) - - // This interface is temporary until rolled out to prod, then we can remove it - // in favour of the gRPC interface. - router.Path("/push").Handler(http.HandlerFunc(ingester.PushHandler)) - router.Path("/query").Handler(http.HandlerFunc(ingester.QueryHandler)) - router.Path("/label_values").Handler(http.HandlerFunc(ingester.LabelValuesHandler)) - router.Path("/user_stats").Handler(http.HandlerFunc(ingester.UserStatsHandler)) - router.Path("/ready").Handler(http.HandlerFunc(ingester.ReadinessHandler)) - return ingester -} - -// setupRuler sets up a ruler. -func setupRuler(distributorCfg distributor.Config, chunkStore chunk.Store, cfg ruler.Config) (ruler.Worker, error) { - dist, err := distributor.New(distributorCfg) - if err != nil { - return nil, err - } - - externalURL, err := url.Parse(cfg.ExternalURL) - if err != nil { - return nil, err - } - - return ruler.NewServer(cfg, ruler.NewRuler(dist, chunkStore, externalURL)) -} diff --git a/cmd/cortex_table_manager/Dockerfile b/cmd/cortex_table_manager/Dockerfile deleted file mode 100644 index c8c5a26d3a5..00000000000 --- a/cmd/cortex_table_manager/Dockerfile +++ /dev/null @@ -1,4 +0,0 @@ -FROM quay.io/prometheus/busybox:latest -COPY cortex_table_manager /bin/cortex_table_manager -EXPOSE 9094 -ENTRYPOINT [ "/bin/cortex_table_manager" ] diff --git a/cmd/cortex_table_manager/main.go b/cmd/cortex_table_manager/main.go deleted file mode 100644 index 107403eb2af..00000000000 --- a/cmd/cortex_table_manager/main.go +++ /dev/null @@ -1,53 +0,0 @@ -package main - -import ( - "flag" - "net/http" - "time" - - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/log" - - "github.com/weaveworks/cortex/chunk" -) - -func main() { - cfg := chunk.TableManagerConfig{ - PeriodicTableConfig: chunk.PeriodicTableConfig{ - UsePeriodicTables: true, - }, - } - - addr := flag.String("web.listen-addr", ":80", "The address to listen on for HTTP requests.") - flag.DurationVar(&cfg.DynamoDBPollInterval, "dynamodb.poll-interval", 2*time.Minute, "How frequently to poll DynamoDB to learn our capacity.") - periodicTableStartAt := flag.String("dynamodb.periodic-table.start", "", "DynamoDB periodic tables start time.") - dynamodbURL := flag.String("dynamodb.url", "localhost:8000", "DynamoDB endpoint URL.") - flag.StringVar(&cfg.TablePrefix, "dynamodb.periodic-table.prefix", "cortex_", "DynamoDB table prefix for the periodic tables.") - flag.DurationVar(&cfg.TablePeriod, "dynamodb.periodic-table.period", 7*24*time.Hour, "DynamoDB periodic tables period.") - flag.DurationVar(&cfg.CreationGracePeriod, "dynamodb.periodic-table.grace-period", 10*time.Minute, "DynamoDB periodic tables grace period (duration which table will be created/deleted before/after it's needed).") - flag.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.") - flag.Int64Var(&cfg.ProvisionedWriteThroughput, "dynamodb.periodic-table.write-throughput", 3000, "DynamoDB periodic tables write throughput") - flag.Int64Var(&cfg.ProvisionedReadThroughput, "dynamodb.periodic-table.read-throughput", 300, "DynamoDB periodic tables read throughput") - flag.Parse() - - var err error - cfg.PeriodicTableStartAt, err = time.Parse(time.RFC3339, *periodicTableStartAt) - if err != nil { - log.Fatalf("Error parsing dynamodb.periodic-table.start: %v", err) - } - - cfg.DynamoDB, cfg.TableName, err = chunk.NewDynamoDBClient(*dynamodbURL) - if err != nil { - log.Fatalf("Error creating DynamoDB client: %v", err) - } - - tableManager, err := chunk.NewDynamoTableManager(cfg) - if err != nil { - log.Fatalf("Error initializing DynamoDB table manager: %v", err) - } - tableManager.Start() - defer tableManager.Stop() - - http.Handle("/metrics", prometheus.Handler()) - log.Fatal(http.ListenAndServe(*addr, nil)) -} diff --git a/cmd/distributor/Dockerfile b/cmd/distributor/Dockerfile new file mode 100644 index 00000000000..f3a66808382 --- /dev/null +++ b/cmd/distributor/Dockerfile @@ -0,0 +1,4 @@ +FROM quay.io/prometheus/busybox:latest +COPY distributor /bin/distributor +EXPOSE 80 +ENTRYPOINT [ "/bin/cortex-distributor" ] diff --git a/cmd/distributor/main.go b/cmd/distributor/main.go new file mode 100644 index 00000000000..0f8392ccedc --- /dev/null +++ b/cmd/distributor/main.go @@ -0,0 +1,40 @@ +package main + +import ( + "flag" + "net/http" + + "github.com/prometheus/common/log" + + "github.com/weaveworks/cortex/distributor" + "github.com/weaveworks/cortex/ring" + "github.com/weaveworks/cortex/server" + "github.com/weaveworks/cortex/util" +) + +func main() { + var ( + serverConfig server.Config + ringConfig ring.Config + distributorConfig distributor.Config + ) + util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig) + flag.Parse() + + r, err := ring.New(ringConfig) + if err != nil { + log.Fatalf("Error initializing ring: %v", err) + } + defer r.Stop() + + dist, err := distributor.New(distributorConfig, r) + if err != nil { + log.Fatalf("Error initializing distributor: %v", err) + } + + server := server.New(serverConfig, r) + server.HTTP.Handle("/api/prom/push", http.HandlerFunc(dist.PushHandler)) + defer server.Stop() + + server.Run() +} diff --git a/cmd/ingester/Dockerfile b/cmd/ingester/Dockerfile new file mode 100644 index 00000000000..ae946b8724c --- /dev/null +++ b/cmd/ingester/Dockerfile @@ -0,0 +1,4 @@ +FROM quay.io/prometheus/busybox:latest +COPY ingester /bin/ingester +EXPOSE 80 +ENTRYPOINT [ "/bin/cortex-ingester" ] diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go new file mode 100644 index 00000000000..ad2e0c9121f --- /dev/null +++ b/cmd/ingester/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "flag" + + "github.com/prometheus/common/log" + + "github.com/weaveworks/cortex" + "github.com/weaveworks/cortex/chunk" + "github.com/weaveworks/cortex/ingester" + "github.com/weaveworks/cortex/ring" + "github.com/weaveworks/cortex/server" + "github.com/weaveworks/cortex/util" +) + +func main() { + var ( + serverConfig server.Config + ingesterRegistrationConfig ring.IngesterRegistrationConfig + chunkStoreConfig chunk.StoreConfig + ingesterConfig ingester.Config + ) + // IngesterRegistrator needs to know our gRPC listen port + ingesterRegistrationConfig.ListenPort = &serverConfig.GRPCListenPort + util.RegisterFlags(&serverConfig, &ingesterRegistrationConfig, &chunkStoreConfig, &ingesterConfig) + flag.Parse() + + registration, err := ring.RegisterIngester(ingesterRegistrationConfig) + if err != nil { + log.Fatalf("Could not register ingester: %v", err) + } + defer registration.Ring.Stop() + + server := server.New(serverConfig, registration.Ring) + chunkStore := chunk.NewAWSStore(chunkStoreConfig) + ingester, err := ingester.New(ingesterConfig, chunkStore) + if err != nil { + log.Fatal(err) + } + cortex.RegisterIngesterServer(server.GRPC, ingester) + + // Deferring a func to make ordering obvious + defer func() { + registration.ChangeState(ring.IngesterState_LEAVING) + ingester.Stop() + registration.Unregister() + server.Stop() + }() + + server.Run() +} diff --git a/cmd/querier/Dockerfile b/cmd/querier/Dockerfile new file mode 100644 index 00000000000..9d654b1f941 --- /dev/null +++ b/cmd/querier/Dockerfile @@ -0,0 +1,4 @@ +FROM quay.io/prometheus/busybox:latest +COPY querier /bin/querier +EXPOSE 80 +ENTRYPOINT [ "/bin/cortex-querier" ] diff --git a/cmd/querier/main.go b/cmd/querier/main.go new file mode 100644 index 00000000000..e9f9c9899ec --- /dev/null +++ b/cmd/querier/main.go @@ -0,0 +1,70 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + + "golang.org/x/net/context" + + "github.com/prometheus/common/log" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/web/api/v1" + + "github.com/weaveworks/cortex/chunk" + "github.com/weaveworks/cortex/distributor" + "github.com/weaveworks/cortex/querier" + "github.com/weaveworks/cortex/ring" + "github.com/weaveworks/cortex/server" + "github.com/weaveworks/cortex/ui" + "github.com/weaveworks/cortex/user" + "github.com/weaveworks/cortex/util" +) + +func main() { + var ( + serverConfig server.Config + ringConfig ring.Config + distributorConfig distributor.Config + chunkStoreConfig chunk.StoreConfig + ) + util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &chunkStoreConfig) + flag.Parse() + + r, err := ring.New(ringConfig) + if err != nil { + log.Fatalf("Error initializing ring: %v", err) + } + defer r.Stop() + + dist, err := distributor.New(distributorConfig, r) + if err != nil { + log.Fatalf("Error initializing distributor: %v", err) + } + + server := server.New(serverConfig, r) + defer server.Stop() + + chunkStore := chunk.NewAWSStore(chunkStoreConfig) + queryable := querier.NewQueryable(dist, chunkStore) + engine := promql.NewEngine(queryable, nil) + api := v1.NewAPI(engine, querier.DummyStorage{Queryable: queryable}) + promRouter := route.New(func(r *http.Request) (context.Context, error) { + userID := r.Header.Get(user.UserIDHeaderName) + if userID == "" { + return nil, fmt.Errorf("no %s header", user.UserIDHeaderName) + } + return user.WithID(r.Context(), userID), nil + }).WithPrefix("/api/prom/api/v1") + api.Register(promRouter) + + subrouter := server.HTTP.PathPrefix("/api/prom").Subrouter() + subrouter.PathPrefix("/api/v1").Handler(promRouter) + subrouter.Path("/validate_expr").Handler(http.HandlerFunc(dist.ValidateExprHandler)) + subrouter.Path("/user_stats").Handler(http.HandlerFunc(dist.UserStatsHandler)) + subrouter.Path("/graph").Handler(ui.GraphHandler()) + subrouter.PathPrefix("/static/").Handler(ui.StaticAssetsHandler("/api/prom/static/")) + + server.Run() +} diff --git a/cmd/ruler/Dockerfile b/cmd/ruler/Dockerfile new file mode 100644 index 00000000000..dd2b2a38893 --- /dev/null +++ b/cmd/ruler/Dockerfile @@ -0,0 +1,4 @@ +FROM quay.io/prometheus/busybox:latest +COPY ruler /bin/ruler +EXPOSE 80 +ENTRYPOINT [ "/bin/cortex-ruler" ] diff --git a/cmd/ruler/main.go b/cmd/ruler/main.go new file mode 100644 index 00000000000..3f9ead84994 --- /dev/null +++ b/cmd/ruler/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "flag" + + "github.com/prometheus/common/log" + + "github.com/weaveworks/cortex/chunk" + "github.com/weaveworks/cortex/distributor" + "github.com/weaveworks/cortex/ring" + "github.com/weaveworks/cortex/ruler" + "github.com/weaveworks/cortex/server" + "github.com/weaveworks/cortex/util" +) + +func main() { + var ( + serverConfig server.Config + ringConfig ring.Config + distributorConfig distributor.Config + rulerConfig ruler.Config + chunkStoreConfig chunk.StoreConfig + ) + util.RegisterFlags(&serverConfig, &ringConfig, &distributorConfig, &rulerConfig, &chunkStoreConfig) + flag.Parse() + + chunkStore := chunk.NewAWSStore(chunkStoreConfig) + + r, err := ring.New(ringConfig) + if err != nil { + log.Fatalf("Error initializing ring: %v", err) + } + defer r.Stop() + + dist, err := distributor.New(distributorConfig, r) + if err != nil { + log.Fatalf("Error initializing distributor: %v", err) + } + + rulerServer, err := ruler.NewServer(rulerConfig, ruler.NewRuler(rulerConfig, dist, chunkStore)) + if err != nil { + log.Fatalf("Error initializing ruler: %v", err) + } + go rulerServer.Run() + defer rulerServer.Stop() + + server := server.New(serverConfig, r) + defer server.Stop() + server.Run() +} diff --git a/cmd/table-manager/Dockerfile b/cmd/table-manager/Dockerfile new file mode 100644 index 00000000000..37b8b2475fd --- /dev/null +++ b/cmd/table-manager/Dockerfile @@ -0,0 +1,4 @@ +FROM quay.io/prometheus/busybox:latest +COPY table-manager /bin/table-manager +EXPOSE 9094 +ENTRYPOINT [ "/bin/cortex-table-manager" ] diff --git a/cmd/table-manager/main.go b/cmd/table-manager/main.go new file mode 100644 index 00000000000..440bc641c87 --- /dev/null +++ b/cmd/table-manager/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "flag" + + "github.com/prometheus/common/log" + + "github.com/weaveworks/cortex/chunk" + "github.com/weaveworks/cortex/server" + "github.com/weaveworks/cortex/util" +) + +func main() { + var ( + serverConfig = server.Config{} + tableManagerConfig = chunk.TableManagerConfig{} + ) + util.RegisterFlags(&serverConfig, &tableManagerConfig) + flag.Parse() + + tableManager, err := chunk.NewDynamoTableManager(tableManagerConfig) + if err != nil { + log.Fatalf("Error initializing DynamoDB table manager: %v", err) + } + tableManager.Start() + defer tableManager.Stop() + + server := server.New(serverConfig, nil) + defer server.Stop() + server.Run() +} diff --git a/distributor/distributor.go b/distributor/distributor.go index 7dbfe34ae37..29e056c1735 100644 --- a/distributor/distributor.go +++ b/distributor/distributor.go @@ -1,6 +1,7 @@ package distributor import ( + "flag" "fmt" "hash/fnv" "sync" @@ -38,6 +39,7 @@ var ( // forwards appends and queries to individual ingesters. type Distributor struct { cfg Config + ring ReadRing clientsMtx sync.RWMutex clients map[string]cortex.IngesterClient @@ -62,24 +64,31 @@ type ReadRing interface { // Config contains the configuration require to // create a Distributor type Config struct { - Ring ReadRing - ReplicationFactor int MinReadSuccesses int HeartbeatTimeout time.Duration RemoteTimeout time.Duration } +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + flag.IntVar(&cfg.ReplicationFactor, "distributor.replication-factor", 3, "The number of ingesters to write to and read from.") + flag.IntVar(&cfg.MinReadSuccesses, "distributor.min-read-successes", 2, "The minimum number of ingesters from which a read must succeed.") + flag.DurationVar(&cfg.HeartbeatTimeout, "distributor.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") + flag.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 5*time.Second, "Timeout for downstream ingesters.") +} + // New constructs a new Distributor -func New(cfg Config) (*Distributor, error) { +func New(cfg Config, ring ReadRing) (*Distributor, error) { if 0 > cfg.ReplicationFactor { return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor) } if cfg.MinReadSuccesses > cfg.ReplicationFactor { return nil, fmt.Errorf("MinReadSuccesses > ReplicationFactor: %d > %d", cfg.MinReadSuccesses, cfg.ReplicationFactor) } - return &Distributor{ + d := &Distributor{ cfg: cfg, + ring: ring, clients: map[string]cortex.IngesterClient{}, queryDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", @@ -118,7 +127,9 @@ func New(cfg Config) (*Distributor, error) { Name: "distributor_ingester_query_failures_total", Help: "The total number of failed queries sent to ingesters.", }, []string{"ingester"}), - }, nil + } + prometheus.MustRegister(d) + return d, nil } func (d *Distributor) getClientFor(ingester *ring.IngesterDesc) (cortex.IngesterClient, error) { @@ -186,7 +197,7 @@ func (d *Distributor) Push(ctx context.Context, req *remote.WriteRequest) (*cort keys[i] = tokenForMetric(userID, sample.Metric) } - ingesters, err := d.cfg.Ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write) + ingesters, err := d.ring.BatchGet(keys, d.cfg.ReplicationFactor, ring.Write) if err != nil { return nil, err } @@ -297,7 +308,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . return err } - ingesters, err := d.cfg.Ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read) + ingesters, err := d.ring.Get(tokenFor(userID, metricName), d.cfg.ReplicationFactor, ring.Read) if err != nil { return err } @@ -360,7 +371,7 @@ func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers . // forAllIngesters runs f, in parallel, for all ingesters func (d *Distributor) forAllIngesters(f func(cortex.IngesterClient) (interface{}, error)) ([]interface{}, error) { resps, errs := make(chan interface{}), make(chan error) - ingesters := d.cfg.Ring.GetAll() + ingesters := d.ring.GetAll() for _, ingester := range ingesters { go func(ingester *ring.IngesterDesc) { client, err := d.getClientFor(ingester) @@ -474,7 +485,7 @@ func (d *Distributor) Describe(ch chan<- *prometheus.Desc) { d.queryDuration.Describe(ch) ch <- d.receivedSamples.Desc() d.sendDuration.Describe(ch) - d.cfg.Ring.Describe(ch) + d.ring.Describe(ch) ch <- numClientsDesc d.ingesterAppends.Describe(ch) d.ingesterAppendFailures.Describe(ch) @@ -487,7 +498,7 @@ func (d *Distributor) Collect(ch chan<- prometheus.Metric) { d.queryDuration.Collect(ch) ch <- d.receivedSamples d.sendDuration.Collect(ch) - d.cfg.Ring.Collect(ch) + d.ring.Collect(ch) d.ingesterAppends.Collect(ch) d.ingesterAppendFailures.Collect(ch) d.ingesterQueries.Collect(ch) diff --git a/ingester/http_server.go b/ingester/http_server.go deleted file mode 100644 index 2629da2f5ce..00000000000 --- a/ingester/http_server.go +++ /dev/null @@ -1,94 +0,0 @@ -package ingester - -import ( - "net/http" - - "github.com/prometheus/common/log" - "github.com/prometheus/prometheus/storage/remote" - - "github.com/weaveworks/cortex" - "github.com/weaveworks/cortex/util" -) - -// PushHandler is a http.Handler that accepts proto encoded samples. -func (i *Ingester) PushHandler(w http.ResponseWriter, r *http.Request) { - var req remote.WriteRequest - ctx, abort := util.ParseProtoRequest(w, r, &req, true) - if abort { - return - } - - _, err := i.Push(ctx, &req) - if err != nil { - switch err { - case ErrOutOfOrderSample, ErrDuplicateSampleForTimestamp: - log.Warnf("append err: %v", err) - http.Error(w, err.Error(), http.StatusBadRequest) - default: - log.Errorf("append err: %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - } - } -} - -// QueryHandler is a http.Handler that accepts protobuf formatted -// query requests and serves them. -func (i *Ingester) QueryHandler(w http.ResponseWriter, r *http.Request) { - var req cortex.QueryRequest - ctx, abort := util.ParseProtoRequest(w, r, &req, false) - if abort { - return - } - - resp, err := i.Query(ctx, &req) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - util.WriteProtoResponse(w, resp) -} - -// LabelValuesHandler handles label values -func (i *Ingester) LabelValuesHandler(w http.ResponseWriter, r *http.Request) { - var req cortex.LabelValuesRequest - ctx, abort := util.ParseProtoRequest(w, r, &req, false) - if abort { - return - } - - resp, err := i.LabelValues(ctx, &req) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - util.WriteProtoResponse(w, resp) -} - -// UserStatsHandler handles user stats requests to the Ingester. -func (i *Ingester) UserStatsHandler(w http.ResponseWriter, r *http.Request) { - ctx, abort := util.ParseProtoRequest(w, r, nil, false) - if abort { - return - } - - resp, err := i.UserStats(ctx, &cortex.UserStatsRequest{}) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - util.WriteProtoResponse(w, resp) -} - -// ReadinessHandler returns 204 when the ingester is ready, -// 500 otherwise. It's used by kubernetes to indicate if the ingester -// pool is ready to have ingesters added / removed. -func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { - if i.Ready() { - w.WriteHeader(http.StatusNoContent) - } else { - w.WriteHeader(http.StatusInternalServerError) - } -} diff --git a/ingester/ingester.go b/ingester/ingester.go index 50e531aa8c8..a4fc5ab7b03 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -1,6 +1,7 @@ package ingester import ( + "flag" "fmt" "sync" "time" @@ -90,11 +91,19 @@ type Config struct { MaxChunkAge time.Duration RateUpdatePeriod time.Duration ConcurrentFlushes int - GRPCListenPort int Ring *ring.Ring } +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") + f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") + f.DurationVar(&cfg.MaxChunkIdle, "ingester.max-chunk-idle", 1*time.Hour, "Maximum chunk idle time before flushing.") + f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", 12*time.Hour, "Maximum chunk age time before flushing.") + f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", DefaultConcurrentFlush, "Number of concurrent goroutines flushing to dynamodb.") +} + type flushOp struct { from model.Time userID string @@ -165,6 +174,7 @@ func New(cfg Config, chunkStore cortex_chunk.Store) (*Ingester, error) { Help: "The total number of samples returned from queries.", }), } + prometheus.MustRegister(i) i.done.Add(cfg.ConcurrentFlushes) for j := 0; j < cfg.ConcurrentFlushes; j++ { diff --git a/push-images b/push-images new file mode 100755 index 00000000000..224d1ce5f3a --- /dev/null +++ b/push-images @@ -0,0 +1,18 @@ +#!/usr/bin/env bash + +set -o errexit +set -o nounset +set -o pipefail + +QUAY_PREFIX=quay.io/ +IMAGES=$(make imgaes) +IMAGE_TAG=$(./tools/image-tag) + +for image in ${IMAGES}; do + docker push ${image}:latest + docker push ${image}:${IMAGE_TAG} + docker tag ${image}:latest ${QUAY_PREFIX}${image}:latest + docker tag ${image}:${IMAGE_TAG} ${QUAY_PREFIX}${image}:${IMAGE_TAG} + docker push ${QUAY_PREFIX}${image}:latest + docker push ${QUAY_PREFIX}${image}:${IMAGE_TAG} +done diff --git a/querier/querier.go b/querier/querier.go index da58ae0c889..136ee410fc5 100644 --- a/querier/querier.go +++ b/querier/querier.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage/local" @@ -126,6 +127,7 @@ func (qm MergeQuerier) QueryRange(ctx context.Context, from, to model.Time, matc } } if lastErr != nil { + log.Errorf("Error in MergeQuerier.QueryRange: %v", lastErr) return nil, lastErr } diff --git a/ring/consul_client.go b/ring/consul_client.go index a4989438d2c..965cd5ab5f3 100644 --- a/ring/consul_client.go +++ b/ring/consul_client.go @@ -2,6 +2,7 @@ package ring import ( "encoding/json" + "flag" "fmt" "sync" "time" @@ -16,6 +17,20 @@ const ( longPollDuration = 10 * time.Second ) +// ConsulConfig to create a ConsulClient +type ConsulConfig struct { + Host string + Prefix string + + mock ConsulClient +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *ConsulConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Host, "consul.hostname", "localhost:8500", "Hostname and port of Consul.") + f.StringVar(&cfg.Prefix, "consul.prefix", "collectors/", "Prefix for keys in Consul.") +} + // ConsulClient is a high-level client for Consul, that exposes operations // such as CAS and Watch which take callbacks. It also deals with serialisation // by having an instance factory passed in to methods and deserialising into that. @@ -48,18 +63,26 @@ type consulClient struct { } // NewConsulClient returns a new ConsulClient. -func NewConsulClient(addr string, codec Codec) (ConsulClient, error) { +func NewConsulClient(cfg ConsulConfig, codec Codec) (ConsulClient, error) { + if cfg.mock != nil { + return cfg.mock, nil + } + client, err := consul.NewClient(&consul.Config{ - Address: addr, + Address: cfg.Host, Scheme: "http", }) if err != nil { return nil, err } - return &consulClient{ + var c ConsulClient = &consulClient{ kv: client.KV(), codec: codec, - }, nil + } + if cfg.Prefix != "" { + c = PrefixClient(c, cfg.Prefix) + } + return c, nil } var ( diff --git a/ring/ingester_lifecycle.go b/ring/ingester_lifecycle.go index 0b322ef770e..bd8df5c359d 100644 --- a/ring/ingester_lifecycle.go +++ b/ring/ingester_lifecycle.go @@ -3,6 +3,7 @@ package ring import ( + "flag" "fmt" "math/rand" "net" @@ -21,8 +22,41 @@ const ( heartbeatInterval = 5 * time.Second ) +var ( + consulHeartbeats = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_consul_heartbeats_total", + Help: "The total number of heartbeats sent to consul.", + }) +) + +func init() { + prometheus.MustRegister(consulHeartbeats) +} + +// IngesterRegistrationConfig is the config for an IngesterRegistration +type IngesterRegistrationConfig struct { + Config + mock *Ring + + ListenPort *int + NumTokens int + + // For testing + Addr string + Hostname string + skipUnregister bool +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *IngesterRegistrationConfig) RegisterFlags(f *flag.FlagSet) { + cfg.Config.RegisterFlags(f) + f.IntVar(&cfg.NumTokens, "ingester.num-tokens", 128, "Number of tokens for each ingester.") +} + // IngesterRegistration manages the connection between the ingester and Consul. type IngesterRegistration struct { + Ring *Ring + consul ConsulClient numTokens int skipUnregister bool @@ -37,24 +71,21 @@ type IngesterRegistration struct { // back empty. Channel is used to tell the actor to update consul on state changes. state IngesterState stateChange chan IngesterState - - consulHeartbeats prometheus.Counter -} - -// IngesterRegistrationConfig is the config for an IngesterRegistration -type IngesterRegistrationConfig struct { - ListenPort int - NumTokens int - Codec *DynamicCodec - - // For testing - Addr string - Hostname string - skipUnregister bool } // RegisterIngester registers an ingester with Consul. -func RegisterIngester(consulClient ConsulClient, cfg IngesterRegistrationConfig) (*IngesterRegistration, error) { +func RegisterIngester(cfg IngesterRegistrationConfig) (*IngesterRegistration, error) { + var ring *Ring + if cfg.mock != nil { + ring = cfg.mock + } else { + var err error + ring, err = New(cfg.Config) + if err != nil { + return nil, err + } + } + hostname := cfg.Hostname if hostname == "" { var err error @@ -74,25 +105,22 @@ func RegisterIngester(consulClient ConsulClient, cfg IngesterRegistrationConfig) } r := &IngesterRegistration{ - consul: consulClient, + Ring: ring, + + consul: ring.consul, numTokens: cfg.NumTokens, skipUnregister: cfg.skipUnregister, - codec: cfg.Codec, + codec: ring.codec, id: hostname, // hostname is the ip+port of this instance, written to consul so // the distributors know where to connect. - addr: fmt.Sprintf("%s:%d", addr, cfg.ListenPort), + addr: fmt.Sprintf("%s:%d", addr, *cfg.ListenPort), quit: make(chan struct{}), // Only read/written on actor goroutine. state: IngesterState_ACTIVE, stateChange: make(chan IngesterState), - - consulHeartbeats: prometheus.NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingester_consul_heartbeats_total", - Help: "The total number of heartbeats sent to consul.", - }), } r.wait.Add(1) @@ -215,7 +243,7 @@ func (r *IngesterRegistration) heartbeat(tokens []uint32) { log.Errorf("Failed to write to consul, sleeping: %v", err) } case <-ticker.C: - r.consulHeartbeats.Inc() + consulHeartbeats.Inc() if err := r.consul.CAS(consulKey, updateConsul); err != nil { log.Errorf("Failed to write to consul, sleeping: %v", err) } @@ -292,13 +320,3 @@ func getFirstAddressOf(name string) (string, error) { return "", fmt.Errorf("No address found for %s", name) } - -// Describe implements prometheus.Collector. -func (r *IngesterRegistration) Describe(ch chan<- *prometheus.Desc) { - ch <- r.consulHeartbeats.Desc() -} - -// Collect implements prometheus.Collector. -func (r *IngesterRegistration) Collect(ch chan<- prometheus.Metric) { - ch <- r.consulHeartbeats -} diff --git a/ring/ingester_lifecycle_test.go b/ring/ingester_lifecycle_test.go index 655d6134a3d..2ebefc96930 100644 --- a/ring/ingester_lifecycle_test.go +++ b/ring/ingester_lifecycle_test.go @@ -9,14 +9,24 @@ import ( func TestIngesterRestart(t *testing.T) { consul := newMockConsulClient() - ring := New(consul, time.Second) + ring, err := New(Config{ + ConsulConfig: ConsulConfig{ + mock: consul, + }, + }) + if err != nil { + t.Fatal(err) + } { - registra, err := RegisterIngester(consul, IngesterRegistrationConfig{ - NumTokens: 1, - Addr: "localhost", - Hostname: "localhost", + registra, err := RegisterIngester(IngesterRegistrationConfig{ + mock: ring, skipUnregister: true, + + NumTokens: 1, + ListenPort: func(i int) *int { return &i }(0), + Addr: "localhost", + Hostname: "localhost", }) if err != nil { t.Fatal(err) @@ -29,11 +39,14 @@ func TestIngesterRestart(t *testing.T) { }) { - registra, err := RegisterIngester(consul, IngesterRegistrationConfig{ - NumTokens: 1, - Addr: "localhost", - Hostname: "localhost", + registra, err := RegisterIngester(IngesterRegistrationConfig{ + mock: ring, skipUnregister: true, + + NumTokens: 1, + ListenPort: func(i int) *int { return &i }(0), + Addr: "localhost", + Hostname: "localhost", }) if err != nil { t.Fatal(err) diff --git a/ring/ring.go b/ring/ring.go index 66547463629..e9217d03d72 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -4,6 +4,7 @@ package ring import ( "errors" + "flag" "math" "sort" "sync" @@ -35,9 +36,24 @@ func (x uint32s) Swap(i, j int) { x[i], x[j] = x[j], x[i] } // ErrEmptyRing is the error returned when trying to get an element when nothing has been added to hash. var ErrEmptyRing = errors.New("empty circle") +// Config for a Ring +type Config struct { + ConsulConfig + + HeartbeatTimeout time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + cfg.ConsulConfig.RegisterFlags(f) + + f.DurationVar(&cfg.HeartbeatTimeout, "ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") +} + // Ring holds the information about the members of the consistent hash circle. type Ring struct { consul ConsulClient + codec *DynamicCodec quit, done chan struct{} heartbeatTimeout time.Duration @@ -50,10 +66,19 @@ type Ring struct { } // New creates a new Ring -func New(consul ConsulClient, heartbeatTimeout time.Duration) *Ring { +func New(cfg Config) (*Ring, error) { + codec := NewDynamicCodec( + JSONCodec{Factory: DescFactory}, + ProtoCodec{Factory: ProtoDescFactory}, + ) + consul, err := NewConsulClient(cfg.ConsulConfig, codec) + if err != nil { + return nil, err + } r := &Ring{ consul: consul, - heartbeatTimeout: heartbeatTimeout, + codec: codec, + heartbeatTimeout: cfg.HeartbeatTimeout, quit: make(chan struct{}), done: make(chan struct{}), ringDesc: &Desc{}, @@ -74,7 +99,7 @@ func New(consul ConsulClient, heartbeatTimeout time.Duration) *Ring { ), } go r.loop() - return r + return r, nil } // Stop the distributor. diff --git a/ruler/ruler.go b/ruler/ruler.go index 0d4c1e7ced9..53cbafdfaf0 100644 --- a/ruler/ruler.go +++ b/ruler/ruler.go @@ -1,6 +1,7 @@ package ruler import ( + "flag" "fmt" "net/url" "time" @@ -15,6 +16,7 @@ import ( "github.com/weaveworks/cortex/distributor" "github.com/weaveworks/cortex/querier" "github.com/weaveworks/cortex/user" + "github.com/weaveworks/cortex/util" ) var ( @@ -43,19 +45,31 @@ func init() { // Config is the configuration for the recording rules server. type Config struct { - ConfigsAPIURL string + ConfigsAPIURL util.URLValue + // HTTP timeout duration for requests made to the Weave Cloud configs // service. ClientTimeout time.Duration + // This is used for template expansion in alerts. Because we don't support // alerts yet, this value doesn't matter. However, it must be a valid URL // in order to navigate Prometheus's code paths. - ExternalURL string + ExternalURL util.URLValue + // How frequently to evaluate rules by default. EvaluationInterval time.Duration NumWorkers int } +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "URL of configs API server.") + f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.") + f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 15*time.Second, "How frequently to evaluate rules") + f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "Timeout for requests to Weave Cloud configs service.") + f.IntVar(&cfg.NumWorkers, "ruler.num-workers", 1, "Number of rule evaluator worker routines in this process") +} + // Ruler evaluates rules. type Ruler struct { engine *promql.Engine @@ -64,8 +78,8 @@ type Ruler struct { } // NewRuler creates a new ruler from a distributor and chunk store. -func NewRuler(d *distributor.Distributor, c chunk.Store, alertURL *url.URL) Ruler { - return Ruler{querier.NewEngine(d, c), d, alertURL} +func NewRuler(cfg Config, d *distributor.Distributor, c chunk.Store) Ruler { + return Ruler{querier.NewEngine(d, c), d, cfg.ExternalURL.URL} } func (r *Ruler) newGroup(ctx context.Context, rs []rules.Rule) *rules.Group { @@ -101,11 +115,7 @@ type Server struct { // NewServer makes a new rule processing server. func NewServer(cfg Config, ruler Ruler) (*Server, error) { - configsAPIURL, err := url.Parse(cfg.ConfigsAPIURL) - if err != nil { - return nil, err - } - c := configsAPI{configsAPIURL, cfg.ClientTimeout} + c := configsAPI{cfg.ConfigsAPIURL.URL, cfg.ClientTimeout} // TODO: Separate configuration for polling interval. s := newScheduler(c, cfg.EvaluationInterval, cfg.EvaluationInterval) if cfg.NumWorkers <= 0 { diff --git a/server/server.go b/server/server.go new file mode 100644 index 00000000000..d2b54738659 --- /dev/null +++ b/server/server.go @@ -0,0 +1,119 @@ +package server + +import ( + "flag" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "syscall" + + "github.com/gorilla/mux" + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/mwitkow/go-grpc-middleware" + "github.com/opentracing-contrib/go-stdlib/nethttp" + "github.com/opentracing/opentracing-go" + "github.com/weaveworks/scope/common/middleware" + "google.golang.org/grpc" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + + "github.com/weaveworks/cortex/ring" + cortex_grpc_middleware "github.com/weaveworks/cortex/util/middleware" +) + +var ( + requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "request_duration_seconds", + Help: "Time (in seconds) spent serving HTTP requests.", + Buckets: prometheus.DefBuckets, + }, []string{"method", "route", "status_code", "ws"}) +) + +func init() { + prometheus.MustRegister(requestDuration) +} + +// Config for a Server +type Config struct { + LogSuccess bool + HTTPListenPort int + GRPCListenPort int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.LogSuccess, "server.log-success", false, "Log successful requests") + f.IntVar(&cfg.HTTPListenPort, "server.http-listen-port", 9094, "HTTP server listen port.") + f.IntVar(&cfg.GRPCListenPort, "server.grpc-listen-port", 9095, "gRPC server listen port.") +} + +// Server wraps a HTTP and gRPC server, and some common initialization. +type Server struct { + cfg Config + + HTTP *mux.Router + GRPC *grpc.Server +} + +// New makes a new Server +func New(cfg Config, r *ring.Ring) *Server { + router := mux.NewRouter() + if r != nil { + router.Handle("/ring", r) + } + router.Handle("/metrics", prometheus.Handler()) + + grpcServer := grpc.NewServer( + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + cortex_grpc_middleware.ServerLoggingInterceptor(cfg.LogSuccess), + cortex_grpc_middleware.ServerInstrumentInterceptor(requestDuration), + otgrpc.OpenTracingServerInterceptor(opentracing.GlobalTracer()), + cortex_grpc_middleware.ServerUserHeaderInterceptor, + )), + ) + + return &Server{ + cfg: cfg, + HTTP: router, + GRPC: grpcServer, + } +} + +// Run the server; blocks until SIGTERM is received. +func (s *Server) Run() { + // Setup HTTP server + instrumented := middleware.Merge( + middleware.Func(func(handler http.Handler) http.Handler { + return nethttp.Middleware(opentracing.GlobalTracer(), handler) + }), + middleware.Log{ + LogSuccess: s.cfg.LogSuccess, + }, + middleware.Instrument{ + Duration: requestDuration, + RouteMatcher: s.HTTP, + }, + ).Wrap(s.HTTP) + go http.ListenAndServe(fmt.Sprintf(":%d", s.cfg.HTTPListenPort), instrumented) + + // Setup gRPC server + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", s.cfg.GRPCListenPort)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + go s.GRPC.Serve(lis) + + term := make(chan os.Signal) + signal.Notify(term, os.Interrupt, syscall.SIGTERM) + <-term + log.Warn("Received SIGTERM, exiting gracefully...") +} + +// Stop the server. Does not unblock Run! +func (s *Server) Stop() { + s.GRPC.Stop() +} diff --git a/util/flags.go b/util/flags.go new file mode 100644 index 00000000000..963460b7d73 --- /dev/null +++ b/util/flags.go @@ -0,0 +1,79 @@ +package util + +import ( + "flag" + "net/url" + "time" + + "github.com/prometheus/common/model" +) + +// Registerer is a thing that can RegisterFlags +type Registerer interface { + RegisterFlags(*flag.FlagSet) +} + +// RegisterFlags registers flags with the provided Registerers +func RegisterFlags(rs ...Registerer) { + for _, r := range rs { + r.RegisterFlags(flag.CommandLine) + } +} + +// TimeValue is a time.Time that can be used as a flag. +type TimeValue struct { + time.Time +} + +// String implements flag.Value +func (v TimeValue) String() string { + return v.Time.Format(time.RFC3339) +} + +// Set implements flag.Value +func (v *TimeValue) Set(s string) error { + var err error + v.Time, err = time.Parse(time.RFC3339, s) + return err +} + +// DayValue is a model.Time that can be used as a flag. +// NB it only parses days! +type DayValue struct { + model.Time +} + +// String implements flag.Value +func (v DayValue) String() string { + return v.Time.Time().Format(time.RFC3339) +} + +// Set implements flag.Value +func (v *DayValue) Set(s string) error { + t, err := time.Parse("2006-01-02", s) + if err != nil { + return err + } + v.Time = model.TimeFromUnix(t.Unix()) + return nil +} + +// URLValue is a url.URL that can be used as a flag. +type URLValue struct { + *url.URL +} + +// String implements flag.Value +func (v URLValue) String() string { + return v.URL.String() +} + +// Set implements flag.Value +func (v *URLValue) Set(s string) error { + u, err := url.Parse(s) + if err != nil { + return err + } + v.URL = u + return nil +}