Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-kit/kit/log/level"
"google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" // get gzip compressor registered
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
Expand Down Expand Up @@ -95,6 +96,7 @@ func main() {
defer ingester.Shutdown()

client.RegisterIngesterServer(server.GRPC, ingester)
grpc_health_v1.RegisterHealthServer(server.GRPC, ingester)
server.HTTP.Path("/ready").Handler(http.HandlerFunc(ingester.ReadinessHandler))
server.HTTP.Path("/flush").Handler(http.HandlerFunc(ingester.FlushHandler))
server.Run()
Expand Down
4 changes: 4 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ For more details on the Cortex architecture, you should read / watch:

The query frontend is an optional job which accepts HTTP requests and queues them by tenant ID, retrying them on errors. This allow for the occasional large query which would otherwise cause a querier OOM, allowing us to over-provision querier parallelism. Also, it prevents multiple large requests from being convoyed on a single querier by distributing them FIFO across all queriers. And finally, it prevent a single tenant from DoSing other tenants by fairly scheduling queries between tenants.

The query frontend will also split multi-day queries into multiple single-day queries, executing these queries in parallel on downstream queriers, and stitching the results back together again. This prevents large, multi-day queries from OOMing a single querier, and helps them execute faster.

Finally, the query frontend will also cache query results and reuse the on subsequent queries. If the cached results are incomplete, the query frontend will calculate the required queries and execute them in parallel on downstream queries. The query frontend can optionally align queries with their step parameter, to improve the cacheability of the query results.

The query frontend job accepts gRPC streaming requests from the queriers, which then "pull" requests from the frontend. For HA it is recommended you run multiple frontends - the queriers will connect to (and pull requests from) all of them. To get the benefit of the fair scheduling, it is recommended you run fewer frontends than queriers - two should suffice.

See the document "[Cortex Query Woes](https://docs.google.com/document/d/1lsvSkv0tiAMPQv-V8vI2LZ8f4i9JuTRsuPI_i-XcAqY)" for more details design discussion. In the future, query splitting, query alignment and query results caching will be added to the frontend.
Expand Down
22 changes: 22 additions & 0 deletions docs/arguments.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,27 @@
# Cortex Arguments Explained

## Query Frontend

- `-querier.align-querier-with-step`

If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results.

- `-querier.split-queries-by-day`

If set to true, will case the query frontend to split multi-day queries into multiple single-day queries and execute them in parallel.

- `-querier.cache-results`

If set to true, will cause the querier to cache query results. The cache will be used to answer future, overlapping queries. The query frontend calculates extra queries required to fill gaps in the cache.

- `-frontend.max-cache-freshness`

When caching query results, it is desirable to prevent the caching of very recent results that might still be in flux. Use this parameter to configure the age of results that should be excluded.

- `-memcached.{hostname, service, timeout}`

Use these flags to specify the location and timeout of the memcached cluster used to cache query results.

## Distributor

- `-distributor.shard-by-all-labels`
Expand Down
42 changes: 1 addition & 41 deletions pkg/chunk/cache/background_test.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,16 @@
package cache_test

import (
"context"
"sync"
"testing"

"github.com/weaveworks/cortex/pkg/chunk/cache"
)

type mockCache struct {
sync.Mutex
cache map[string][]byte
}

func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) {
m.Lock()
defer m.Unlock()
for i := range keys {
m.cache[keys[i]] = bufs[i]
}
}

func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) {
m.Lock()
defer m.Unlock()
for _, key := range keys {
buf, ok := m.cache[key]
if ok {
found = append(found, key)
bufs = append(bufs, buf)
} else {
missing = append(missing, key)
}
}
return
}

func (m *mockCache) Stop() error {
return nil
}

func newMockCache() cache.Cache {
return &mockCache{
cache: map[string][]byte{},
}
}

func TestBackground(t *testing.T) {
c := cache.NewBackground(cache.BackgroundConfig{
WriteBackGoroutines: 1,
WriteBackBuffer: 100,
}, newMockCache())
}, cache.NewMockCache())

keys, chunks := fillCache(t, c)
cache.Flush(c)
Expand Down
5 changes: 5 additions & 0 deletions pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,8 @@ func TestFifoCache(t *testing.T) {
cache := cache.NewFifoCache("test", 1e3, 1*time.Hour)
testCache(t, cache)
}

func TestSnappyCache(t *testing.T) {
cache := cache.NewSnappy(cache.NewMockCache())
testCache(t, cache)
}
13 changes: 12 additions & 1 deletion pkg/chunk/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cache

import (
"context"
"encoding/hex"
"flag"
"hash/fnv"
"sync"
"time"

Expand Down Expand Up @@ -211,7 +213,7 @@ func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) {
if err != nil {
sp := opentracing.SpanFromContext(ctx)
sp.LogFields(otlog.Error(err))
level.Error(util.Logger).Log("msg", "failed to put to diskcache", "err", err)
level.Error(util.Logger).Log("msg", "failed to put to memcached", "err", err)
}
}
}
Expand All @@ -226,3 +228,12 @@ func (c *Memcached) Stop() error {
c.wg.Wait()
return nil
}

// HashKey hashes key into something you can store in memcached.
func HashKey(key string) string {
hasher := fnv.New64a()
hasher.Write([]byte(key)) // This'll never error.

// Hex because memcache errors for the bytes produced by the hash.
return hex.EncodeToString(hasher.Sum(nil))
}
45 changes: 45 additions & 0 deletions pkg/chunk/cache/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cache

import (
"context"
"sync"
)

type mockCache struct {
sync.Mutex
cache map[string][]byte
}

func (m *mockCache) Store(_ context.Context, keys []string, bufs [][]byte) {
m.Lock()
defer m.Unlock()
for i := range keys {
m.cache[keys[i]] = bufs[i]
}
}

func (m *mockCache) Fetch(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string) {
m.Lock()
defer m.Unlock()
for _, key := range keys {
buf, ok := m.cache[key]
if ok {
found = append(found, key)
bufs = append(bufs, buf)
} else {
missing = append(missing, key)
}
}
return
}

func (m *mockCache) Stop() error {
return nil
}

// NewMockCache makes a new MockCache
func NewMockCache() Cache {
return &mockCache{
cache: map[string][]byte{},
}
}
47 changes: 47 additions & 0 deletions pkg/chunk/cache/snappy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cache

import (
"context"

"github.com/go-kit/kit/log/level"
"github.com/golang/snappy"
"github.com/weaveworks/cortex/pkg/util"
)

type snappyCache struct {
next Cache
}

// NewSnappy makes a new snappy encoding cache wrapper.
func NewSnappy(next Cache) Cache {
return &snappyCache{
next: next,
}
}

func (s *snappyCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
cs := make([][]byte, 0, len(bufs))
for _, buf := range bufs {
c := snappy.Encode(nil, buf)
cs = append(cs, c)
}
s.next.Store(ctx, keys, cs)
}

func (s *snappyCache) Fetch(ctx context.Context, keys []string) ([]string, [][]byte, []string) {
found, bufs, missing := s.next.Fetch(ctx, keys)
ds := make([][]byte, 0, len(bufs))
for _, buf := range bufs {
d, err := snappy.Decode(nil, buf)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to decode cache entry", "err", err)
return nil, nil, keys
}
ds = append(ds, d)
}
return found, ds, missing
}

func (s *snappyCache) Stop() error {
return s.next.Stop()
}
4 changes: 2 additions & 2 deletions pkg/chunk/cache/tiered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ func TestTieredSimple(t *testing.T) {
for i := 1; i < 10; i++ {
caches := []cache.Cache{}
for j := 0; j <= i; j++ {
caches = append(caches, newMockCache())
caches = append(caches, cache.NewMockCache())
}
cache := cache.NewTiered(caches)
testCache(t, cache)
}
}

func TestTiered(t *testing.T) {
level1, level2 := newMockCache(), newMockCache()
level1, level2 := cache.NewMockCache(), cache.NewMockCache()
cache := cache.NewTiered([]cache.Cache{level1, level2})

level1.Store(context.Background(), []string{"key1"}, [][]byte{[]byte("hello")})
Expand Down
Loading