From ad65b8b3155f7b882ef9c8183d271057ba66c1ec Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Sun, 4 Feb 2018 09:24:46 +0100 Subject: [PATCH 01/12] Refactor memcache(d) into chunk package. - Caches don't know about chunk anymore, its all just []byte. - Deal with empty memcached host by returning a noopCache, not special casing nil client. - Make background writes a cache 'middleware'. - s/memcache/memcached/. - Refactor tests. --- pkg/chunk/cache/cache.go | 125 ++++++++++ pkg/chunk/cache/cache_test.go | 122 +++++++++ pkg/chunk/cache/memcached.go | 140 +++++++++++ .../memcached_client.go} | 30 ++- pkg/chunk/cache/memcached_test.go | 39 +++ pkg/chunk/chunk_cache.go | 231 ------------------ pkg/chunk/chunk_cache_test.go | 114 --------- pkg/chunk/chunk_store.go | 49 +++- 8 files changed, 487 insertions(+), 363 deletions(-) create mode 100644 pkg/chunk/cache/cache.go create mode 100644 pkg/chunk/cache/cache_test.go create mode 100644 pkg/chunk/cache/memcached.go rename pkg/chunk/{memcache_client.go => cache/memcached_client.go} (75%) create mode 100644 pkg/chunk/cache/memcached_test.go delete mode 100644 pkg/chunk/chunk_cache.go delete mode 100644 pkg/chunk/chunk_cache_test.go diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go new file mode 100644 index 00000000000..ac5ceb6face --- /dev/null +++ b/pkg/chunk/cache/cache.go @@ -0,0 +1,125 @@ +package cache + +import ( + "context" + "flag" + "sync" + + "github.com/go-kit/kit/log/level" + "github.com/weaveworks/cortex/pkg/util" +) + +// Cache byte arrays by key. +type Cache interface { + StoreChunk(ctx context.Context, key string, buf []byte) error + FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) + Stop() error +} + +// Config for building Caches. +type Config struct { + WriteBackGoroutines int + WriteBackBuffer int + + memcache MemcachedConfig + memcacheClient MemcachedClientConfig + + disk DiskcacheConfig +} + +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") + f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") + + cfg.memcache.RegisterFlags(f) + cfg.memcacheClient.RegisterFlags(f) + cfg.disk.RegisterFlags(f) +} + +func New(cfg Config) Cache { + if cfg.memcacheClient.Host == "" { + return noopCache{} + } + + client := newMemcachedClient(cfg.memcacheClient) + return NewMemcached(cfg.memcache, client) +} + +type backgroundCache struct { + Cache + + wg sync.WaitGroup + quit chan struct{} + bgWrites chan backgroundWrite +} + +type backgroundWrite struct { + key string + buf []byte +} + +func NewBackground(cfg Config) Cache { + c := &backgroundCache{ + quit: make(chan struct{}), + bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), + } + + c.wg.Add(cfg.WriteBackGoroutines) + for i := 0; i < cfg.WriteBackGoroutines; i++ { + go c.writeBackLoop() + } + + return c +} + +// Stop the background flushing goroutines. +func (c *backgroundCache) Stop() error { + close(c.quit) + c.wg.Wait() + + return c.Cache.Stop() +} + +// BackgroundWrite writes chunks for the cache in the background +func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error { + bgWrite := backgroundWrite{ + key: key, + buf: buf, + } + select { + case c.bgWrites <- bgWrite: + default: + memcacheDroppedWriteBack.Inc() + } + return nil +} + +func (c *backgroundCache) writeBackLoop() { + defer c.wg.Done() + + for { + select { + case bgWrite := <-c.bgWrites: + err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) + if err != nil { + level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err) + } + case <-c.quit: + return + } + } +} + +type noopCache struct{} + +func (noopCache) StoreChunk(ctx context.Context, key string, buf []byte) error { + return nil +} + +func (noopCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) { + return nil, nil, nil +} + +func (noopCache) Stop() error { + return nil +} diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go new file mode 100644 index 00000000000..8ecd041a984 --- /dev/null +++ b/pkg/chunk/cache/cache_test.go @@ -0,0 +1,122 @@ +package cache_test + +import ( + "context" + "math/rand" + "os" + "path" + "strconv" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/cache" + prom_chunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" +) + +func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { + const ( + userID = "1" + chunkLen = 13 * 3600 // in seconds + ) + + // put 100 chunks from 0 to 99 + keys := []string{} + chunks := []chunk.Chunk{} + for i := 0; i < 100; i++ { + ts := model.TimeFromUnix(int64(i * chunkLen)) + promChunk, _ := prom_chunk.New().Add(model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(i), + }) + c := chunk.NewChunk( + userID, + model.Fingerprint(1), + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + }, + promChunk[0], + ts, + ts.Add(chunkLen), + ) + + buf, err := c.Encode() + require.NoError(t, err) + + key := c.ExternalKey() + err = cache.StoreChunk(context.Background(), key, buf) + require.NoError(t, err) + + keys = append(keys, key) + chunks = append(chunks, c) + } + + return keys, chunks +} + +func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) { + for i := 0; i < 100; i++ { + index := rand.Intn(len(keys)) + key := keys[index] + + found, bufs, err := cache.FetchChunkData(context.Background(), []string{key}) + require.NoError(t, err) + require.Len(t, found, 1) + require.Len(t, bufs, 1) + + foundChunks, missing, err := chunk.ProcessCacheResponse([]chunk.Chunk{chunks[index]}, found, bufs) + require.NoError(t, err) + require.Empty(t, missing) + require.Equal(t, chunks[index], foundChunks[0]) + } +} + +func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) { + // test getting them all + found, bufs, err := cache.FetchChunkData(context.Background(), keys) + require.NoError(t, err) + require.Len(t, found, len(keys)) + require.Len(t, bufs, len(keys)) + + foundChunks, missing, err := chunk.ProcessCacheResponse(chunks, found, bufs) + require.NoError(t, err) + require.Empty(t, missing) + require.Equal(t, chunks, foundChunks) +} + +func testCacheMiss(t *testing.T, cache cache.Cache) { + for i := 0; i < 100; i++ { + key := strconv.Itoa(rand.Int()) + found, bufs, err := cache.FetchChunkData(context.Background(), []string{key}) + require.NoError(t, err) + require.Empty(t, found) + require.Empty(t, bufs) + } +} + +func testCache(t *testing.T, cache cache.Cache) { + keys, chunks := fillCache(t, cache) + testCacheSingle(t, cache, keys, chunks) + testCacheMultiple(t, cache, keys, chunks) + testCacheMiss(t, cache) +} + +func TestMemcache(t *testing.T) { + cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache()) + testCache(t, cache) +} + +func TestDiskcache(t *testing.T) { + dirname := os.TempDir() + filename := path.Join(dirname, "diskcache") + defer os.RemoveAll(filename) + + cache, err := cache.NewDiskcache(cache.DiskcacheConfig{ + Path: filename, + Size: 100 * 1024 * 1024, + }) + require.NoError(t, err) + testCache(t, cache) +} diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go new file mode 100644 index 00000000000..3254d6010f0 --- /dev/null +++ b/pkg/chunk/cache/memcached.go @@ -0,0 +1,140 @@ +package cache + +import ( + "context" + "flag" + "time" + + "github.com/bradfitz/gomemcache/memcache" + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/instrument" +) + +var ( + memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_requests_total", + Help: "Total count of chunks requested from memcache.", + }) + + memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_hits_total", + Help: "Total count of chunks found in memcache.", + }) + + memcacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_corrupt_chunks_total", + Help: "Total count of corrupt chunks found in memcache.", + }) + + memcacheDroppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "memcache_dropped_write_back", + Help: "Total count of dropped write backs to memcache.", + }) + + memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "memcache_request_duration_seconds", + Help: "Total time spent in seconds doing memcache requests.", + // Memecache requests are very quick: smallest bucket is 16us, biggest is 1s + Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), + }, []string{"method", "status_code"}) +) + +func init() { + prometheus.MustRegister(memcacheRequests) + prometheus.MustRegister(memcacheHits) + prometheus.MustRegister(memcacheCorrupt) + prometheus.MustRegister(memcacheDroppedWriteBack) + prometheus.MustRegister(memcacheRequestDuration) +} + +// MemcachedConfig is config to make a Memcached +type MemcachedConfig struct { + Expiration time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") +} + +// Memcached type caches chunks in memcached +type Memcached struct { + cfg MemcachedConfig + memcache MemcachedClient +} + +// NewMemcached makes a new Memcache +func NewMemcached(cfg MemcachedConfig, client MemcachedClient) *Memcached { + c := &Memcached{ + cfg: cfg, + memcache: client, + } + return c +} + +func memcacheStatusCode(err error) string { + // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables + switch err { + case nil: + return "200" + case memcache.ErrCacheMiss: + return "404" + case memcache.ErrMalformedKey: + return "400" + default: + return "500" + } +} + +// FetchChunkData gets chunks from the chunk cache. +func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) { + sp, ctx := ot.StartSpanFromContext(ctx, "FetchChunkData") + defer sp.Finish() + sp.LogFields(otlog.Int("chunks requested", len(keys))) + memcacheRequests.Add(float64(len(keys))) + + var items map[string]*memcache.Item + err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + var err error + items, err = c.memcache.GetMulti(keys) + return err + }) + if err != nil { + return + } + sp.LogFields(otlog.Int("chunks returned", len(items))) + + for _, key := range keys { + item, ok := items[key] + if ok { + found = append(found, key) + bufs = append(bufs, item.Value) + } + } + sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(keys)-len(found))) + memcacheHits.Add(float64(len(found))) + return +} + +// StoreChunk serializes and stores a chunk in the chunk cache. +func (c *Memcached) StoreChunk(ctx context.Context, key string, buf []byte) error { + return instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + item := memcache.Item{ + Key: key, + Value: buf, + Expiration: int32(c.cfg.Expiration.Seconds()), + } + return c.memcache.Set(&item) + }) +} + +func (*Memcached) Stop() error { + return nil +} diff --git a/pkg/chunk/memcache_client.go b/pkg/chunk/cache/memcached_client.go similarity index 75% rename from pkg/chunk/memcache_client.go rename to pkg/chunk/cache/memcached_client.go index f88556479ba..43a52fffab3 100644 --- a/pkg/chunk/memcache_client.go +++ b/pkg/chunk/cache/memcached_client.go @@ -1,4 +1,4 @@ -package chunk +package cache import ( "flag" @@ -13,9 +13,15 @@ import ( "github.com/weaveworks/cortex/pkg/util" ) -// MemcacheClient is a memcache client that gets its server list from SRV +// MemcachedClient interface exists for mocking memcacheClient. +type MemcachedClient interface { + GetMulti(keys []string) (map[string]*memcache.Item, error) + Set(item *memcache.Item) error +} + +// memcachedClient is a memcache client that gets its server list from SRV // records, and periodically updates that ServerList. -type MemcacheClient struct { +type memcachedClient struct { *memcache.Client serverList *memcache.ServerList hostname string @@ -25,8 +31,8 @@ type MemcacheClient struct { wait sync.WaitGroup } -// MemcacheConfig defines how a MemcacheClient should be constructed. -type MemcacheConfig struct { +// MemcachedClientConfig defines how a MemcachedClient should be constructed. +type MemcachedClientConfig struct { Host string Service string Timeout time.Duration @@ -34,21 +40,21 @@ type MemcacheConfig struct { } // RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *MemcacheConfig) RegisterFlags(f *flag.FlagSet) { +func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Host, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") f.StringVar(&cfg.Service, "memcached.service", "memcached", "SRV service used to discover memcache servers.") f.DurationVar(&cfg.Timeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") f.DurationVar(&cfg.UpdateInterval, "memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.") } -// NewMemcacheClient creates a new MemcacheClient that gets its server list +// newMemcachedClient creates a new MemcacheClient that gets its server list // from SRV and updates the server list on a regular basis. -func NewMemcacheClient(cfg MemcacheConfig) *MemcacheClient { +func newMemcachedClient(cfg MemcachedClientConfig) *memcachedClient { var servers memcache.ServerList client := memcache.NewFromSelector(&servers) client.Timeout = cfg.Timeout - newClient := &MemcacheClient{ + newClient := &memcachedClient{ Client: client, serverList: &servers, hostname: cfg.Host, @@ -66,12 +72,12 @@ func NewMemcacheClient(cfg MemcacheConfig) *MemcacheClient { } // Stop the memcache client. -func (c *MemcacheClient) Stop() { +func (c *memcachedClient) Stop() { close(c.quit) c.wait.Wait() } -func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error { +func (c *memcachedClient) updateLoop(updateInterval time.Duration) error { defer c.wait.Done() ticker := time.NewTicker(updateInterval) var err error @@ -90,7 +96,7 @@ func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error { // updateMemcacheServers sets a memcache server list from SRV records. SRV // priority & weight are ignored. -func (c *MemcacheClient) updateMemcacheServers() error { +func (c *memcachedClient) updateMemcacheServers() error { _, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname) if err != nil { return err diff --git a/pkg/chunk/cache/memcached_test.go b/pkg/chunk/cache/memcached_test.go new file mode 100644 index 00000000000..028fba8ef46 --- /dev/null +++ b/pkg/chunk/cache/memcached_test.go @@ -0,0 +1,39 @@ +package cache_test + +import ( + "sync" + + "github.com/bradfitz/gomemcache/memcache" +) + +type mockMemcache struct { + sync.RWMutex + contents map[string][]byte +} + +func newMockMemcache() *mockMemcache { + return &mockMemcache{ + contents: map[string][]byte{}, + } +} + +func (m *mockMemcache) GetMulti(keys []string) (map[string]*memcache.Item, error) { + m.RLock() + defer m.RUnlock() + result := map[string]*memcache.Item{} + for _, k := range keys { + if c, ok := m.contents[k]; ok { + result[k] = &memcache.Item{ + Value: c, + } + } + } + return result, nil +} + +func (m *mockMemcache) Set(item *memcache.Item) error { + m.Lock() + defer m.Unlock() + m.contents[item.Key] = item.Value + return nil +} diff --git a/pkg/chunk/chunk_cache.go b/pkg/chunk/chunk_cache.go deleted file mode 100644 index 3c5ae9459bf..00000000000 --- a/pkg/chunk/chunk_cache.go +++ /dev/null @@ -1,231 +0,0 @@ -package chunk - -import ( - "context" - "flag" - "sync" - "time" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/go-kit/kit/log/level" - ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/instrument" - - "github.com/weaveworks/cortex/pkg/util" -) - -var ( - memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_requests_total", - Help: "Total count of chunks requested from memcache.", - }) - - memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_hits_total", - Help: "Total count of chunks found in memcache.", - }) - - memcacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_corrupt_chunks_total", - Help: "Total count of corrupt chunks found in memcache.", - }) - - memcacheDroppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_dropped_write_back", - Help: "Total count of dropped write backs to memcache.", - }) - - memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "memcache_request_duration_seconds", - Help: "Total time spent in seconds doing memcache requests.", - // Memecache requests are very quick: smallest bucket is 16us, biggest is 1s - Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), - }, []string{"method", "status_code"}) -) - -func init() { - prometheus.MustRegister(memcacheRequests) - prometheus.MustRegister(memcacheHits) - prometheus.MustRegister(memcacheCorrupt) - prometheus.MustRegister(memcacheDroppedWriteBack) - prometheus.MustRegister(memcacheRequestDuration) -} - -// Memcache caches things -type Memcache interface { - GetMulti(keys []string) (map[string]*memcache.Item, error) - Set(item *memcache.Item) error -} - -// CacheConfig is config to make a Cache -type CacheConfig struct { - Expiration time.Duration - WriteBackGoroutines int - WriteBackBuffer int - memcacheConfig MemcacheConfig -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) { - f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") - f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") - f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") - cfg.memcacheConfig.RegisterFlags(f) -} - -// Cache type caches chunks -type Cache struct { - cfg CacheConfig - memcache Memcache - - wg sync.WaitGroup - quit chan struct{} - bgWrites chan backgroundWrite -} - -type backgroundWrite struct { - key string - buf []byte -} - -// NewCache makes a new Cache -func NewCache(cfg CacheConfig) *Cache { - var memcache Memcache - if cfg.memcacheConfig.Host != "" { - memcache = NewMemcacheClient(cfg.memcacheConfig) - } - c := &Cache{ - cfg: cfg, - memcache: memcache, - quit: make(chan struct{}), - bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), - } - c.wg.Add(cfg.WriteBackGoroutines) - for i := 0; i < cfg.WriteBackGoroutines; i++ { - go c.writeBackLoop() - } - return c -} - -// Stop the background flushing goroutines. -func (c *Cache) Stop() { - close(c.quit) - c.wg.Wait() -} - -func memcacheStatusCode(err error) string { - // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables - switch err { - case nil: - return "200" - case memcache.ErrCacheMiss: - return "404" - case memcache.ErrMalformedKey: - return "400" - default: - return "500" - } -} - -// FetchChunkData gets chunks from the chunk cache. -func (c *Cache) FetchChunkData(ctx context.Context, chunks []Chunk) (found []Chunk, missing []Chunk, err error) { - sp, ctx := ot.StartSpanFromContext(ctx, "FetchChunkData") - defer sp.Finish() - sp.LogFields(otlog.Int("chunks requested", len(chunks))) - - if c.memcache == nil { - return nil, chunks, nil - } - - memcacheRequests.Add(float64(len(chunks))) - - keys := make([]string, 0, len(chunks)) - for _, chunk := range chunks { - keys = append(keys, chunk.ExternalKey()) - } - - var items map[string]*memcache.Item - err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { - var err error - items, err = c.memcache.GetMulti(keys) - return err - }) - if err != nil { - return nil, chunks, err - } - - sp.LogFields(otlog.Int("chunks returned", len(items))) - decodeContext := NewDecodeContext() - for i, externalKey := range keys { - item, ok := items[externalKey] - if !ok { - missing = append(missing, chunks[i]) - continue - } - - if err := chunks[i].Decode(decodeContext, item.Value); err != nil { - memcacheCorrupt.Inc() - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to decode chunk from cache", "err", err) - missing = append(missing, chunks[i]) - continue - } - - found = append(found, chunks[i]) - } - sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(missing))) - - memcacheHits.Add(float64(len(found))) - return found, missing, nil -} - -// StoreChunk serializes and stores a chunk in the chunk cache. -func (c *Cache) StoreChunk(ctx context.Context, key string, buf []byte) error { - if c.memcache == nil { - return nil - } - - return instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { - item := memcache.Item{ - Key: key, - Value: buf, - Expiration: int32(c.cfg.Expiration.Seconds()), - } - return c.memcache.Set(&item) - }) -} - -// BackgroundWrite writes chunks for the cache in the background -func (c *Cache) BackgroundWrite(key string, buf []byte) { - bgWrite := backgroundWrite{ - key: key, - buf: buf, - } - select { - case c.bgWrites <- bgWrite: - default: - memcacheDroppedWriteBack.Inc() - } -} - -func (c *Cache) writeBackLoop() { - defer c.wg.Done() - - for { - select { - case bgWrite := <-c.bgWrites: - err := c.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) - if err != nil { - level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err) - } - case <-c.quit: - return - } - } -} diff --git a/pkg/chunk/chunk_cache_test.go b/pkg/chunk/chunk_cache_test.go deleted file mode 100644 index 09144ce5826..00000000000 --- a/pkg/chunk/chunk_cache_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package chunk - -import ( - "math/rand" - "sync" - "testing" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" - "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" - "golang.org/x/net/context" -) - -type mockMemcache struct { - sync.RWMutex - contents map[string][]byte -} - -func newMockMemcache() *mockMemcache { - return &mockMemcache{ - contents: map[string][]byte{}, - } -} - -func (m *mockMemcache) GetMulti(keys []string) (map[string]*memcache.Item, error) { - m.RLock() - defer m.RUnlock() - result := map[string]*memcache.Item{} - for _, k := range keys { - if c, ok := m.contents[k]; ok { - result[k] = &memcache.Item{ - Value: c, - } - } - } - return result, nil -} - -func (m *mockMemcache) Set(item *memcache.Item) error { - m.Lock() - defer m.Unlock() - m.contents[item.Key] = item.Value - return nil -} - -func TestChunkCache(t *testing.T) { - c := Cache{ - memcache: newMockMemcache(), - } - - const ( - chunkLen = 13 * 3600 // in seconds - ) - - // put 100 chunks from 0 to 99 - keys := []string{} - chunks := []Chunk{} - for i := 0; i < 100; i++ { - ts := model.TimeFromUnix(int64(i * chunkLen)) - promChunk, _ := chunk.New().Add(model.SamplePair{ - Timestamp: ts, - Value: model.SampleValue(i), - }) - chunk := NewChunk( - userID, - model.Fingerprint(1), - model.Metric{ - model.MetricNameLabel: "foo", - "bar": "baz", - }, - promChunk[0], - ts, - ts.Add(chunkLen), - ) - - buf, err := chunk.Encode() - require.NoError(t, err) - - key := chunk.ExternalKey() - err = c.StoreChunk(context.Background(), key, buf) - require.NoError(t, err) - - keys = append(keys, key) - chunks = append(chunks, chunk) - } - - for i := 0; i < 100; i++ { - index := rand.Intn(len(keys)) - key := keys[index] - - chunk, err := parseExternalKey(userID, key) - require.NoError(t, err) - - found, missing, err := c.FetchChunkData(context.Background(), []Chunk{chunk}) - require.NoError(t, err) - require.Empty(t, missing) - require.Len(t, found, 1) - require.Equal(t, chunks[index], found[0]) - } - - // test getting them all - receivedChunks := []Chunk{} - for i := 0; i < len(keys); i++ { - chunk, err := parseExternalKey(userID, keys[i]) - require.NoError(t, err) - receivedChunks = append(receivedChunks, chunk) - } - found, missing, err := c.FetchChunkData(context.Background(), receivedChunks) - require.NoError(t, err) - require.Empty(t, missing) - require.Len(t, found, len(keys)) - require.Equal(t, chunks, receivedChunks) -} diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index ac421f86309..644f49be9b9 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/chunk/cache" "github.com/weaveworks/cortex/pkg/util" ) @@ -42,7 +43,7 @@ func init() { // StoreConfig specifies config for a ChunkStore type StoreConfig struct { - CacheConfig + CacheConfig cache.Config // For injecting different schemas in tests. schemaFactory func(cfg SchemaConfig) Schema @@ -58,7 +59,7 @@ type Store struct { cfg StoreConfig storage StorageClient - cache *Cache + cache cache.Cache schema Schema } @@ -79,7 +80,7 @@ func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (* cfg: cfg, storage: storage, schema: schema, - cache: NewCache(cfg.CacheConfig), + cache: cache.New(cfg.CacheConfig), }, nil } @@ -196,15 +197,22 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim // Filter out chunks that are not in the selected time range. filtered := make([]Chunk, 0, len(chunks)) + keys := make([]string, 0, len(chunks)) for _, chunk := range chunks { if chunk.Through < from || through < chunk.From { continue } filtered = append(filtered, chunk) + keys = append(keys, chunk.ExternalKey()) } // Now fetch the actual chunk data from Memcache / S3 - fromCache, missing, err := c.cache.FetchChunkData(ctx, filtered) + cacheHits, cacheBufs, err := c.cache.FetchChunkData(ctx, keys) + if err != nil { + level.Warn(logger).Log("msg", "error fetching from cache", "err", err) + } + + fromCache, missing, err := ProcessCacheResponse(chunks, cacheHits, cacheBufs) if err != nil { level.Warn(logger).Log("msg", "error fetching from cache", "err", err) } @@ -240,6 +248,33 @@ outer: return filteredChunks, nil } +func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { + ctx := NewDecodeContext() + + for i, j := 0, 0; i < len(chunks) && j < len(keys); { + chunkKey := chunks[i].ExternalKey() + + if chunkKey < keys[j] { + missing = append(missing, chunks[i]) + i++ + } else if chunkKey > keys[j] { + // Got a chunk response we shouldn't have + j++ + } else { + chunk := chunks[i] + err = chunk.Decode(ctx, bufs[j]) + if err != nil { + return + } + found = append(found, chunk) + i++ + j++ + } + } + + return +} + func (c *Store) getSeriesMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) (model.Matrix, error) { // Get all series from the index userID, err := user.ExtractOrgID(ctx) @@ -466,13 +501,15 @@ func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []Index return unique(chunkSet), nil } -func (c *Store) writeBackCache(_ context.Context, chunks []Chunk) error { +func (c *Store) writeBackCache(ctx context.Context, chunks []Chunk) error { for i := range chunks { encoded, err := chunks[i].Encode() if err != nil { return err } - c.cache.BackgroundWrite(chunks[i].ExternalKey(), encoded) + if err := c.cache.StoreChunk(ctx, chunks[i].ExternalKey(), encoded); err != nil { + return err + } } return nil } From 1f458e5ecc41418b84a4dd4966ac0f217e62ff0a Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 5 Feb 2018 15:39:36 +0100 Subject: [PATCH 02/12] Add diskcache, for caching chunks on local SSD in queriers. - mmap a large file, treat it as a series of 2KB buckets. - Use FNV hash to place key and chunk in buckets. - Use existing memcached tests --- pkg/chunk/cache/diskcache.go | 158 +++++++++++++++++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 pkg/chunk/cache/diskcache.go diff --git a/pkg/chunk/cache/diskcache.go b/pkg/chunk/cache/diskcache.go new file mode 100644 index 00000000000..ee48fec9c86 --- /dev/null +++ b/pkg/chunk/cache/diskcache.go @@ -0,0 +1,158 @@ +package cache + +import ( + "context" + "encoding/binary" + "flag" + "fmt" + "hash/fnv" + "os" + "sync" + + "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" + "golang.org/x/sys/unix" +) + +// TODO: in the future we could cuckoo hash or linear probe. + +// Buckets contain key (~50), chunks (1024) and their metadata (~100) +const bucketSize = 2048 + +// DiskcacheConfig for the Disk cache. +type DiskcacheConfig struct { + Path string + Size int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *DiskcacheConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Path, "diskcache.path", "/var/run/chunks", "Path to file used to cache chunks.") + f.IntVar(&cfg.Size, "diskcache.size", 1024*1024*1024, "Size of file (bytes)") +} + +// Diskcache is an on-disk chunk cache. +type Diskcache struct { + mtx sync.RWMutex + f *os.File + buckets uint32 + buf []byte +} + +// NewDiskcache creates a new on-disk cache. +func NewDiskcache(cfg DiskcacheConfig) (*Diskcache, error) { + f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, errors.Wrap(err, "open") + } + + if err := fileutil.Preallocate(f, int64(cfg.Size), true); err != nil { + return nil, errors.Wrap(err, "preallocate") + } + + info, err := f.Stat() + if err != nil { + return nil, errors.Wrap(err, "stat") + } + + buf, err := unix.Mmap(int(f.Fd()), 0, int(info.Size()), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) + if err != nil { + f.Close() + return nil, err + } + + buckets := len(buf) / bucketSize + + return &Diskcache{ + f: f, + buf: buf, + buckets: uint32(buckets), + }, nil +} + +// Stop closes the file. +func (d *Diskcache) Stop() error { + if err := unix.Munmap(d.buf); err != nil { + return err + } + return d.f.Close() +} + +// FetchChunkData get chunks from the cache. +func (d *Diskcache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) { + for _, key := range keys { + buf, ok := d.fetch(key) + if ok { + found = append(found, key) + bufs = append(bufs, buf) + } + } + return +} + +func (d *Diskcache) fetch(key string) ([]byte, bool) { + d.mtx.RLock() + defer d.mtx.RUnlock() + + bucket := hash(key) % d.buckets + buf := d.buf[bucket*bucketSize : (bucket+1)*bucketSize] + + existingKey, n, ok := get(buf, 0) + if !ok || string(existingKey) != key { + return nil, false + } + + existingValue, _, ok := get(buf, n) + if !ok { + return nil, false + } + + result := make([]byte, len(existingValue), len(existingValue)) + copy(result, existingValue) + return result, true +} + +// StoreChunk puts a chunk into the cache. +func (d *Diskcache) StoreChunk(ctx context.Context, key string, value []byte) error { + d.mtx.Lock() + defer d.mtx.Unlock() + + bucket := hash(key) % d.buckets + buf := d.buf[bucket*bucketSize : (bucket+1)*bucketSize] + + n, err := put([]byte(key), buf, 0) + if err != nil { + return err + } + + _, err = put(value, buf, n) + if err != nil { + return err + } + + return nil +} + +func put(value []byte, buf []byte, n int) (int, error) { + if len(value)+n+4 > len(buf) { + return 0, errors.Wrap(fmt.Errorf("value too big: %d > %d", len(value), len(buf)), "put") + } + m := binary.PutUvarint(buf[n:], uint64(len(value))) + copy(buf[n+m:], value) + return len(value) + n + m, nil +} + +func get(buf []byte, n int) ([]byte, int, bool) { + size, m := binary.Uvarint(buf[n:]) + end := n + m + int(size) + if end > len(buf) { + return nil, 0, false + } + return buf[n+m : end], end, true +} + +func hash(key string) uint32 { + h := fnv.New32() + h.Write([]byte(key)) + return h.Sum32() +} From 2c55d1f85b4c93d3386b87e35a46b14226ca4536 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 5 Feb 2018 18:40:18 +0000 Subject: [PATCH 03/12] Plumb in diskcache. --- pkg/chunk/cache/cache.go | 67 +++++++++++++++++++++++++++++++--------- pkg/chunk/chunk_store.go | 7 ++++- 2 files changed, 58 insertions(+), 16 deletions(-) diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index ac5ceb6face..0d9efb59639 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -20,29 +20,42 @@ type Cache interface { type Config struct { WriteBackGoroutines int WriteBackBuffer int + EnableDiskcache bool memcache MemcachedConfig memcacheClient MemcachedClientConfig - - disk DiskcacheConfig + diskcache DiskcacheConfig } +// RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache") f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") cfg.memcache.RegisterFlags(f) cfg.memcacheClient.RegisterFlags(f) - cfg.disk.RegisterFlags(f) + cfg.diskcache.RegisterFlags(f) } -func New(cfg Config) Cache { - if cfg.memcacheClient.Host == "" { - return noopCache{} +// New creates a new Cache using Config. +func New(cfg Config) (Cache, error) { + caches := []Cache{} + + if cfg.memcacheClient.Host != "" { + client := newMemcachedClient(cfg.memcacheClient) + caches = append(caches, NewMemcached(cfg.memcache, client)) } - client := newMemcachedClient(cfg.memcacheClient) - return NewMemcached(cfg.memcache, client) + if cfg.EnableDiskcache { + cache, err := NewDiskcache(cfg.diskcache) + if err != nil { + return nil, err + } + caches = append(caches, cache) + } + + return multiCache(caches), nil } type backgroundCache struct { @@ -58,8 +71,15 @@ type backgroundWrite struct { buf []byte } -func NewBackground(cfg Config) Cache { +// NewBackground returns a new Cache that does stores on background goroutines. +func NewBackground(cfg Config) (Cache, error) { + cache, err := New(cfg) + if err != nil { + return nil, err + } + c := &backgroundCache{ + Cache: cache, quit: make(chan struct{}), bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), } @@ -69,7 +89,7 @@ func NewBackground(cfg Config) Cache { go c.writeBackLoop() } - return c + return c, nil } // Stop the background flushing goroutines. @@ -80,7 +100,7 @@ func (c *backgroundCache) Stop() error { return c.Cache.Stop() } -// BackgroundWrite writes chunks for the cache in the background +// StoreChunk writes chunks for the cache in the background. func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error { bgWrite := backgroundWrite{ key: key, @@ -110,16 +130,33 @@ func (c *backgroundCache) writeBackLoop() { } } -type noopCache struct{} +type multiCache []Cache -func (noopCache) StoreChunk(ctx context.Context, key string, buf []byte) error { +func (m multiCache) StoreChunk(ctx context.Context, key string, buf []byte) error { + for _, c := range []Cache(m) { + if err := c.StoreChunk(ctx, key, buf); err != nil { + return err + } + } return nil } -func (noopCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) { +func (m multiCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, error) { + for _, c := range []Cache(m) { + found, bufs, err := c.FetchChunkData(ctx, keys) + if err != nil { + return nil, nil, err + } + return found, bufs, nil + } return nil, nil, nil } -func (noopCache) Stop() error { +func (m multiCache) Stop() error { + for _, c := range []Cache(m) { + if err := c.Stop(); err != nil { + return err + } + } return nil } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 644f49be9b9..b97a7be9356 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -76,11 +76,16 @@ func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (* return nil, err } + cache, err := cache.NewBackground(cfg.CacheConfig) + if err != nil { + return nil, err + } + return &Store{ cfg: cfg, storage: storage, schema: schema, - cache: cache.New(cfg.CacheConfig), + cache: cache, }, nil } From db422ce7981b4701e529666cb8d8997e42affdb2 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 17:31:19 +0000 Subject: [PATCH 04/12] Fix test failure and lint. --- pkg/chunk/cache/memcached.go | 1 + pkg/chunk/chunk_store.go | 13 ++++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go index 3254d6010f0..6107cdd0c7d 100644 --- a/pkg/chunk/cache/memcached.go +++ b/pkg/chunk/cache/memcached.go @@ -135,6 +135,7 @@ func (c *Memcached) StoreChunk(ctx context.Context, key string, buf []byte) erro }) } +// Stop does nothing. func (*Memcached) Stop() error { return nil } diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index b97a7be9356..3f53a97f09f 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -217,7 +217,7 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim level.Warn(logger).Log("msg", "error fetching from cache", "err", err) } - fromCache, missing, err := ProcessCacheResponse(chunks, cacheHits, cacheBufs) + fromCache, missing, err := ProcessCacheResponse(filtered, cacheHits, cacheBufs) if err != nil { level.Warn(logger).Log("msg", "error fetching from cache", "err", err) } @@ -253,17 +253,20 @@ outer: return filteredChunks, nil } +// ProcessCacheResponse decodes the chunks coming back from the cache, separating +// hits and misses. func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { ctx := NewDecodeContext() - for i, j := 0, 0; i < len(chunks) && j < len(keys); { + i, j := 0, 0 + for i < len(chunks) && j < len(keys) { chunkKey := chunks[i].ExternalKey() if chunkKey < keys[j] { missing = append(missing, chunks[i]) i++ } else if chunkKey > keys[j] { - // Got a chunk response we shouldn't have + level.Debug(util.Logger).Log("msg", "got chunk from cache we didn't ask for") j++ } else { chunk := chunks[i] @@ -277,6 +280,10 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [ } } + for ; i < len(chunks); i++ { + missing = append(missing, chunks[i]) + } + return } From 4c0bfe4c818c0ae1156909b7b7895f0eb82815d2 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 17:56:28 +0000 Subject: [PATCH 05/12] Make the hierarchical cache work. --- pkg/chunk/cache/cache.go | 34 ++++++++++++++++++++++++++++------ pkg/chunk/cache/cache_test.go | 9 ++++++--- pkg/chunk/cache/diskcache.go | 4 +++- pkg/chunk/cache/memcached.go | 4 +++- pkg/chunk/chunk_store.go | 2 +- 5 files changed, 41 insertions(+), 12 deletions(-) diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index 0d9efb59639..a1afa1a660c 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -12,7 +12,7 @@ import ( // Cache byte arrays by key. type Cache interface { StoreChunk(ctx context.Context, key string, buf []byte) error - FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) + FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) Stop() error } @@ -141,15 +141,37 @@ func (m multiCache) StoreChunk(ctx context.Context, key string, buf []byte) erro return nil } -func (m multiCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, error) { +func (m multiCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { + found := make(map[string][]byte, len(keys)) + missing := keys + for _, c := range []Cache(m) { - found, bufs, err := c.FetchChunkData(ctx, keys) + var ( + err error + passKeys []string + passBufs [][]byte + ) + + passKeys, passBufs, missing, err = c.FetchChunkData(ctx, missing) if err != nil { - return nil, nil, err + return nil, nil, nil, err + } + + for i, key := range passKeys { + found[key] = passBufs[i] } - return found, bufs, nil } - return nil, nil, nil + + resultKeys := make([]string, 0, len(found)) + resultBufs := make([][]byte, 0, len(found)) + for _, key := range keys { + if buf, ok := found[key]; ok { + resultKeys = append(resultKeys, key) + resultBufs = append(resultBufs, buf) + } + } + + return resultKeys, resultBufs, missing, nil } func (m multiCache) Stop() error { diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index 8ecd041a984..b0df014dd23 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -61,10 +61,11 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch index := rand.Intn(len(keys)) key := keys[index] - found, bufs, err := cache.FetchChunkData(context.Background(), []string{key}) + found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), []string{key}) require.NoError(t, err) require.Len(t, found, 1) require.Len(t, bufs, 1) + require.Len(t, missingKeys, 0) foundChunks, missing, err := chunk.ProcessCacheResponse([]chunk.Chunk{chunks[index]}, found, bufs) require.NoError(t, err) @@ -75,10 +76,11 @@ func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []ch func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) { // test getting them all - found, bufs, err := cache.FetchChunkData(context.Background(), keys) + found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), keys) require.NoError(t, err) require.Len(t, found, len(keys)) require.Len(t, bufs, len(keys)) + require.Len(t, missingKeys, 0) foundChunks, missing, err := chunk.ProcessCacheResponse(chunks, found, bufs) require.NoError(t, err) @@ -89,10 +91,11 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks [] func testCacheMiss(t *testing.T, cache cache.Cache) { for i := 0; i < 100; i++ { key := strconv.Itoa(rand.Int()) - found, bufs, err := cache.FetchChunkData(context.Background(), []string{key}) + found, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{key}) require.NoError(t, err) require.Empty(t, found) require.Empty(t, bufs) + require.Len(t, missing, 1) } } diff --git a/pkg/chunk/cache/diskcache.go b/pkg/chunk/cache/diskcache.go index ee48fec9c86..d32683aa40a 100644 --- a/pkg/chunk/cache/diskcache.go +++ b/pkg/chunk/cache/diskcache.go @@ -79,12 +79,14 @@ func (d *Diskcache) Stop() error { } // FetchChunkData get chunks from the cache. -func (d *Diskcache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) { +func (d *Diskcache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { for _, key := range keys { buf, ok := d.fetch(key) if ok { found = append(found, key) bufs = append(bufs, buf) + } else { + missed = append(missed, key) } } return diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go index 6107cdd0c7d..c07d40f58f2 100644 --- a/pkg/chunk/cache/memcached.go +++ b/pkg/chunk/cache/memcached.go @@ -94,7 +94,7 @@ func memcacheStatusCode(err error) string { } // FetchChunkData gets chunks from the chunk cache. -func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, err error) { +func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { sp, ctx := ot.StartSpanFromContext(ctx, "FetchChunkData") defer sp.Finish() sp.LogFields(otlog.Int("chunks requested", len(keys))) @@ -116,6 +116,8 @@ func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found [] if ok { found = append(found, key) bufs = append(bufs, item.Value) + } else { + missed = append(missed, key) } } sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(keys)-len(found))) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 3f53a97f09f..34bdebfc5f7 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -212,7 +212,7 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim } // Now fetch the actual chunk data from Memcache / S3 - cacheHits, cacheBufs, err := c.cache.FetchChunkData(ctx, keys) + cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) if err != nil { level.Warn(logger).Log("msg", "error fetching from cache", "err", err) } From 7505107f76469c5e1b789490a09eb85b7dcc8f91 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 18:38:40 +0000 Subject: [PATCH 06/12] Add instrumentation middleware for caches. --- pkg/chunk/cache/background.go | 109 ++++++++++++++++++++++++ pkg/chunk/cache/cache.go | 145 +++----------------------------- pkg/chunk/cache/instrumented.go | 86 +++++++++++++++++++ pkg/chunk/cache/memcached.go | 45 +--------- pkg/chunk/cache/tiered.go | 56 ++++++++++++ pkg/chunk/chunk_store.go | 9 +- 6 files changed, 272 insertions(+), 178 deletions(-) create mode 100644 pkg/chunk/cache/background.go create mode 100644 pkg/chunk/cache/instrumented.go create mode 100644 pkg/chunk/cache/tiered.go diff --git a/pkg/chunk/cache/background.go b/pkg/chunk/cache/background.go new file mode 100644 index 00000000000..28424b767bf --- /dev/null +++ b/pkg/chunk/cache/background.go @@ -0,0 +1,109 @@ +package cache + +import ( + "context" + "flag" + "sync" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/cortex/pkg/util" +) + +var ( + droppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_dropped_background_writes_total", + Help: "Total count of dropped write backs to cache.", + }) + queueLength = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "cache_background_queue_length", + Help: "Length of the cache background write queue.", + }) +) + +func init() { + prometheus.MustRegister(droppedWriteBack) + prometheus.MustRegister(queueLength) +} + +type backgroundConfig struct { + WriteBackGoroutines int + WriteBackBuffer int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *backgroundConfig) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") + f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") +} + +type backgroundCache struct { + Cache + + wg sync.WaitGroup + quit chan struct{} + bgWrites chan backgroundWrite +} + +type backgroundWrite struct { + key string + buf []byte +} + +// newBackground returns a new Cache that does stores on background goroutines. +func newBackground(cfg backgroundConfig, cache Cache) Cache { + c := &backgroundCache{ + Cache: cache, + quit: make(chan struct{}), + bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), + } + + c.wg.Add(cfg.WriteBackGoroutines) + for i := 0; i < cfg.WriteBackGoroutines; i++ { + go c.writeBackLoop() + } + + return c +} + +// Stop the background flushing goroutines. +func (c *backgroundCache) Stop() error { + close(c.quit) + c.wg.Wait() + + return c.Cache.Stop() +} + +// StoreChunk writes chunks for the cache in the background. +func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error { + bgWrite := backgroundWrite{ + key: key, + buf: buf, + } + select { + case c.bgWrites <- bgWrite: + queueLength.Inc() + default: + droppedWriteBack.Inc() + } + return nil +} + +func (c *backgroundCache) writeBackLoop() { + defer c.wg.Done() + + for { + select { + case bgWrite := <-c.bgWrites: + queueLength.Dec() + err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) + if err != nil { + level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err) + } + case <-c.quit: + return + } + } +} diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index a1afa1a660c..80dd719a8d9 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -3,10 +3,6 @@ package cache import ( "context" "flag" - "sync" - - "github.com/go-kit/kit/log/level" - "github.com/weaveworks/cortex/pkg/util" ) // Cache byte arrays by key. @@ -18,10 +14,9 @@ type Cache interface { // Config for building Caches. type Config struct { - WriteBackGoroutines int - WriteBackBuffer int - EnableDiskcache bool + EnableDiskcache bool + background backgroundConfig memcache MemcachedConfig memcacheClient MemcachedClientConfig diskcache DiskcacheConfig @@ -30,8 +25,6 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache") - f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") - f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") cfg.memcache.RegisterFlags(f) cfg.memcacheClient.RegisterFlags(f) @@ -44,7 +37,8 @@ func New(cfg Config) (Cache, error) { if cfg.memcacheClient.Host != "" { client := newMemcachedClient(cfg.memcacheClient) - caches = append(caches, NewMemcached(cfg.memcache, client)) + cache := NewMemcached(cfg.memcache, client) + caches = append(caches, instrument("memcache", cache)) } if cfg.EnableDiskcache { @@ -52,133 +46,14 @@ func New(cfg Config) (Cache, error) { if err != nil { return nil, err } - caches = append(caches, cache) - } - - return multiCache(caches), nil -} - -type backgroundCache struct { - Cache - - wg sync.WaitGroup - quit chan struct{} - bgWrites chan backgroundWrite -} - -type backgroundWrite struct { - key string - buf []byte -} - -// NewBackground returns a new Cache that does stores on background goroutines. -func NewBackground(cfg Config) (Cache, error) { - cache, err := New(cfg) - if err != nil { - return nil, err - } - - c := &backgroundCache{ - Cache: cache, - quit: make(chan struct{}), - bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), - } - - c.wg.Add(cfg.WriteBackGoroutines) - for i := 0; i < cfg.WriteBackGoroutines; i++ { - go c.writeBackLoop() - } - - return c, nil -} - -// Stop the background flushing goroutines. -func (c *backgroundCache) Stop() error { - close(c.quit) - c.wg.Wait() - - return c.Cache.Stop() -} - -// StoreChunk writes chunks for the cache in the background. -func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error { - bgWrite := backgroundWrite{ - key: key, - buf: buf, - } - select { - case c.bgWrites <- bgWrite: - default: - memcacheDroppedWriteBack.Inc() - } - return nil -} - -func (c *backgroundCache) writeBackLoop() { - defer c.wg.Done() - - for { - select { - case bgWrite := <-c.bgWrites: - err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) - if err != nil { - level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err) - } - case <-c.quit: - return - } + caches = append(caches, instrument("diskcache", cache)) } -} - -type multiCache []Cache -func (m multiCache) StoreChunk(ctx context.Context, key string, buf []byte) error { - for _, c := range []Cache(m) { - if err := c.StoreChunk(ctx, key, buf); err != nil { - return err - } + var cache Cache = tiered(caches) + if len(caches) > 1 { + cache = instrument("tiered", cache) } - return nil -} - -func (m multiCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { - found := make(map[string][]byte, len(keys)) - missing := keys - for _, c := range []Cache(m) { - var ( - err error - passKeys []string - passBufs [][]byte - ) - - passKeys, passBufs, missing, err = c.FetchChunkData(ctx, missing) - if err != nil { - return nil, nil, nil, err - } - - for i, key := range passKeys { - found[key] = passBufs[i] - } - } - - resultKeys := make([]string, 0, len(found)) - resultBufs := make([][]byte, 0, len(found)) - for _, key := range keys { - if buf, ok := found[key]; ok { - resultKeys = append(resultKeys, key) - resultBufs = append(resultBufs, buf) - } - } - - return resultKeys, resultBufs, missing, nil -} - -func (m multiCache) Stop() error { - for _, c := range []Cache(m) { - if err := c.Stop(); err != nil { - return err - } - } - return nil + cache = newBackground(cfg.background, cache) + return cache, nil } diff --git a/pkg/chunk/cache/instrumented.go b/pkg/chunk/cache/instrumented.go new file mode 100644 index 00000000000..3d664cc35c8 --- /dev/null +++ b/pkg/chunk/cache/instrumented.go @@ -0,0 +1,86 @@ +package cache + +import ( + "context" + + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/client_golang/prometheus" + instr "github.com/weaveworks/common/instrument" +) + +var ( + requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "cache_request_duration_seconds", + Help: "Total time spent in seconds doing cache requests.", + // Cache requests are very quick: smallest bucket is 16us, biggest is 1s. + Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), + }, []string{"method", "status_code"}) + + fetchedKeys = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_fetched_keys", + Help: "Total count of chunks requested from memcache.", + }) + + hits = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_hits", + Help: "Total count of chunks found in memcache.", + }) +) + +func init() { + prometheus.MustRegister(requestDuration) + prometheus.MustRegister(fetchedKeys) + prometheus.MustRegister(hits) +} + +func instrument(name string, cache Cache) Cache { + return &instrumentedCache{ + name: name, + Cache: cache, + } +} + +type instrumentedCache struct { + name string + Cache +} + +func (i *instrumentedCache) StoreChunk(ctx context.Context, key string, buf []byte) error { + return instr.TimeRequestHistogram(ctx, i.name+".store", requestDuration, func(ctx context.Context) error { + return i.Cache.StoreChunk(ctx, key, buf) + }) +} + +func (i *instrumentedCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { + var ( + found []string + bufs [][]byte + missing []string + ) + err := instr.TimeRequestHistogram(ctx, i.name+".fetch", requestDuration, func(ctx context.Context) error { + sp := ot.SpanFromContext(ctx) + sp.LogFields(otlog.Int("chunks requested", len(keys))) + + var err error + found, bufs, missing, err = i.Cache.FetchChunkData(ctx, keys) + + if err == nil { + sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(keys)-len(found))) + } else { + sp.LogFields(otlog.Error(err)) + } + + return err + }) + fetchedKeys.Add(float64(len(keys))) + hits.Add(float64(len(found))) + return found, bufs, missing, err +} + +func (i *instrumentedCache) Stop() error { + return i.Cache.Stop() +} diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go index c07d40f58f2..7cfb33d4c71 100644 --- a/pkg/chunk/cache/memcached.go +++ b/pkg/chunk/cache/memcached.go @@ -6,37 +6,11 @@ import ( "time" "github.com/bradfitz/gomemcache/memcache" - ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/instrument" + instr "github.com/weaveworks/common/instrument" ) var ( - memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_requests_total", - Help: "Total count of chunks requested from memcache.", - }) - - memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_hits_total", - Help: "Total count of chunks found in memcache.", - }) - - memcacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_corrupt_chunks_total", - Help: "Total count of corrupt chunks found in memcache.", - }) - - memcacheDroppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_dropped_write_back", - Help: "Total count of dropped write backs to memcache.", - }) - memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "memcache_request_duration_seconds", @@ -47,10 +21,6 @@ var ( ) func init() { - prometheus.MustRegister(memcacheRequests) - prometheus.MustRegister(memcacheHits) - prometheus.MustRegister(memcacheCorrupt) - prometheus.MustRegister(memcacheDroppedWriteBack) prometheus.MustRegister(memcacheRequestDuration) } @@ -95,13 +65,8 @@ func memcacheStatusCode(err error) string { // FetchChunkData gets chunks from the chunk cache. func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { - sp, ctx := ot.StartSpanFromContext(ctx, "FetchChunkData") - defer sp.Finish() - sp.LogFields(otlog.Int("chunks requested", len(keys))) - memcacheRequests.Add(float64(len(keys))) - var items map[string]*memcache.Item - err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { var err error items, err = c.memcache.GetMulti(keys) return err @@ -109,8 +74,6 @@ func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found [] if err != nil { return } - sp.LogFields(otlog.Int("chunks returned", len(items))) - for _, key := range keys { item, ok := items[key] if ok { @@ -120,14 +83,12 @@ func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found [] missed = append(missed, key) } } - sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(keys)-len(found))) - memcacheHits.Add(float64(len(found))) return } // StoreChunk serializes and stores a chunk in the chunk cache. func (c *Memcached) StoreChunk(ctx context.Context, key string, buf []byte) error { - return instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + return instr.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { item := memcache.Item{ Key: key, Value: buf, diff --git a/pkg/chunk/cache/tiered.go b/pkg/chunk/cache/tiered.go new file mode 100644 index 00000000000..067b0541341 --- /dev/null +++ b/pkg/chunk/cache/tiered.go @@ -0,0 +1,56 @@ +package cache + +import "context" + +type tiered []Cache + +func (t tiered) StoreChunk(ctx context.Context, key string, buf []byte) error { + for _, c := range []Cache(t) { + if err := c.StoreChunk(ctx, key, buf); err != nil { + return err + } + } + return nil +} + +func (t tiered) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { + found := make(map[string][]byte, len(keys)) + missing := keys + + for _, c := range []Cache(t) { + var ( + err error + passKeys []string + passBufs [][]byte + ) + + passKeys, passBufs, missing, err = c.FetchChunkData(ctx, missing) + if err != nil { + return nil, nil, nil, err + } + + for i, key := range passKeys { + found[key] = passBufs[i] + } + } + + resultKeys := make([]string, 0, len(found)) + resultBufs := make([][]byte, 0, len(found)) + for _, key := range keys { + if buf, ok := found[key]; ok { + resultKeys = append(resultKeys, key) + resultBufs = append(resultBufs, buf) + } + } + + return resultKeys, resultBufs, missing, nil +} + +func (t tiered) Stop() error { + for _, c := range []Cache(t) { + if err := c.Stop(); err != nil { + return err + } + } + return nil +} diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 34bdebfc5f7..aae93d92a69 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -34,11 +34,17 @@ var ( }, HashBuckets: 1024, }) + cacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_corrupt_chunks_total", + Help: "Total count of corrupt chunks found in memcache.", + }) ) func init() { prometheus.MustRegister(indexEntriesPerChunk) prometheus.MustRegister(rowWrites) + prometheus.MustRegister(cacheCorrupt) } // StoreConfig specifies config for a ChunkStore @@ -76,7 +82,7 @@ func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (* return nil, err } - cache, err := cache.NewBackground(cfg.CacheConfig) + cache, err := cache.New(cfg.CacheConfig) if err != nil { return nil, err } @@ -272,6 +278,7 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [ chunk := chunks[i] err = chunk.Decode(ctx, bufs[j]) if err != nil { + cacheCorrupt.Inc() return } found = append(found, chunk) From dbb495ef55001b6a931b1eb310545cc880cd7c6d Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 21:16:27 +0000 Subject: [PATCH 07/12] Add tests for background and tiered cache. --- pkg/chunk/cache/background.go | 9 ++--- pkg/chunk/cache/background_test.go | 54 ++++++++++++++++++++++++++++++ pkg/chunk/cache/cache.go | 4 +-- pkg/chunk/cache/tiered.go | 5 +++ pkg/chunk/cache/tiered_test.go | 37 ++++++++++++++++++++ 5 files changed, 103 insertions(+), 6 deletions(-) create mode 100644 pkg/chunk/cache/background_test.go create mode 100644 pkg/chunk/cache/tiered_test.go diff --git a/pkg/chunk/cache/background.go b/pkg/chunk/cache/background.go index 28424b767bf..b13543429cb 100644 --- a/pkg/chunk/cache/background.go +++ b/pkg/chunk/cache/background.go @@ -28,13 +28,14 @@ func init() { prometheus.MustRegister(queueLength) } -type backgroundConfig struct { +// BackgroundConfig is config for a Background Cache. +type BackgroundConfig struct { WriteBackGoroutines int WriteBackBuffer int } // RegisterFlags adds the flags required to config this to the given FlagSet. -func (cfg *backgroundConfig) RegisterFlags(f *flag.FlagSet) { +func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") } @@ -52,8 +53,8 @@ type backgroundWrite struct { buf []byte } -// newBackground returns a new Cache that does stores on background goroutines. -func newBackground(cfg backgroundConfig, cache Cache) Cache { +// NewBackground returns a new Cache that does stores on background goroutines. +func NewBackground(cfg BackgroundConfig, cache Cache) Cache { c := &backgroundCache{ Cache: cache, quit: make(chan struct{}), diff --git a/pkg/chunk/cache/background_test.go b/pkg/chunk/cache/background_test.go new file mode 100644 index 00000000000..67301d7ab9f --- /dev/null +++ b/pkg/chunk/cache/background_test.go @@ -0,0 +1,54 @@ +package cache_test + +import ( + "context" + "sync" + "testing" + + "github.com/weaveworks/cortex/pkg/chunk/cache" +) + +type mockCache struct { + sync.Mutex + cache map[string][]byte +} + +func (m *mockCache) StoreChunk(_ context.Context, key string, buf []byte) error { + m.Lock() + defer m.Unlock() + m.cache[key] = buf + return nil +} + +func (m *mockCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { + m.Lock() + defer m.Unlock() + for _, key := range keys { + buf, ok := m.cache[key] + if ok { + found = append(found, key) + bufs = append(bufs, buf) + } else { + missing = append(missing, key) + } + } + return +} + +func (m *mockCache) Stop() error { + return nil +} + +func newMockCache() cache.Cache { + return &mockCache{ + cache: map[string][]byte{}, + } +} + +func TestBackground(t *testing.T) { + cache := cache.NewBackground(cache.BackgroundConfig{ + WriteBackGoroutines: 1, + WriteBackBuffer: 100, + }, newMockCache()) + testCache(t, cache) +} diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index 80dd719a8d9..032062ee2c8 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -16,7 +16,7 @@ type Cache interface { type Config struct { EnableDiskcache bool - background backgroundConfig + background BackgroundConfig memcache MemcachedConfig memcacheClient MemcachedClientConfig diskcache DiskcacheConfig @@ -54,6 +54,6 @@ func New(cfg Config) (Cache, error) { cache = instrument("tiered", cache) } - cache = newBackground(cfg.background, cache) + cache = NewBackground(cfg.background, cache) return cache, nil } diff --git a/pkg/chunk/cache/tiered.go b/pkg/chunk/cache/tiered.go index 067b0541341..f2328ed28b5 100644 --- a/pkg/chunk/cache/tiered.go +++ b/pkg/chunk/cache/tiered.go @@ -4,6 +4,11 @@ import "context" type tiered []Cache +// NewTiered makes a new tiered cache. +func NewTiered(caches []Cache) Cache { + return tiered(caches) +} + func (t tiered) StoreChunk(ctx context.Context, key string, buf []byte) error { for _, c := range []Cache(t) { if err := c.StoreChunk(ctx, key, buf); err != nil { diff --git a/pkg/chunk/cache/tiered_test.go b/pkg/chunk/cache/tiered_test.go new file mode 100644 index 00000000000..32cc3b6bc46 --- /dev/null +++ b/pkg/chunk/cache/tiered_test.go @@ -0,0 +1,37 @@ +package cache_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk/cache" +) + +func TestTieredSimple(t *testing.T) { + for i := 1; i < 10; i++ { + caches := []cache.Cache{} + for j := 0; j <= i; j++ { + caches = append(caches, newMockCache()) + } + cache := cache.NewTiered(caches) + testCache(t, cache) + } +} + +func TestTiered(t *testing.T) { + level1, level2 := newMockCache(), newMockCache() + cache := cache.NewTiered([]cache.Cache{level1, level2}) + + err := level1.StoreChunk(context.Background(), "key1", []byte("hello")) + require.NoError(t, err) + + err = level2.StoreChunk(context.Background(), "key2", []byte("world")) + require.NoError(t, err) + + keys, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{"key1", "key2", "key3"}) + require.NoError(t, err) + require.Equal(t, []string{"key1", "key2"}, keys) + require.Equal(t, [][]byte{[]byte("hello"), []byte("world")}, bufs) + require.Equal(t, []string{"key3"}, missing) +} From 274de67d66967f179080d2271fc0b0719428db84 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 6 Feb 2018 21:37:55 +0000 Subject: [PATCH 08/12] Ensure the background goroutines have flushed before checking the cache for entries. --- pkg/chunk/cache/background.go | 5 ++++- pkg/chunk/cache/background_extra_test.go | 7 +++++++ pkg/chunk/cache/background_test.go | 10 ++++++++-- 3 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 pkg/chunk/cache/background_extra_test.go diff --git a/pkg/chunk/cache/background.go b/pkg/chunk/cache/background.go index b13543429cb..2560cc49a7e 100644 --- a/pkg/chunk/cache/background.go +++ b/pkg/chunk/cache/background.go @@ -97,7 +97,10 @@ func (c *backgroundCache) writeBackLoop() { for { select { - case bgWrite := <-c.bgWrites: + case bgWrite, ok := <-c.bgWrites: + if !ok { + return + } queueLength.Dec() err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) if err != nil { diff --git a/pkg/chunk/cache/background_extra_test.go b/pkg/chunk/cache/background_extra_test.go new file mode 100644 index 00000000000..07aa21c847f --- /dev/null +++ b/pkg/chunk/cache/background_extra_test.go @@ -0,0 +1,7 @@ +package cache + +func Flush(c Cache) { + b := c.(*backgroundCache) + close(b.bgWrites) + b.wg.Wait() +} diff --git a/pkg/chunk/cache/background_test.go b/pkg/chunk/cache/background_test.go index 67301d7ab9f..ebf540abd75 100644 --- a/pkg/chunk/cache/background_test.go +++ b/pkg/chunk/cache/background_test.go @@ -46,9 +46,15 @@ func newMockCache() cache.Cache { } func TestBackground(t *testing.T) { - cache := cache.NewBackground(cache.BackgroundConfig{ + c := cache.NewBackground(cache.BackgroundConfig{ WriteBackGoroutines: 1, WriteBackBuffer: 100, }, newMockCache()) - testCache(t, cache) + + keys, chunks := fillCache(t, c) + cache.Flush(c) + + testCacheSingle(t, c, keys, chunks) + testCacheMultiple(t, c, keys, chunks) + testCacheMiss(t, c) } From faec74c3c7edb19298be1c0a7e1061886efa939e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 7 Feb 2018 10:36:13 +0000 Subject: [PATCH 09/12] Breakdown cache hit rate by cache name. --- pkg/chunk/cache/instrumented.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/chunk/cache/instrumented.go b/pkg/chunk/cache/instrumented.go index 3d664cc35c8..f0104f4d734 100644 --- a/pkg/chunk/cache/instrumented.go +++ b/pkg/chunk/cache/instrumented.go @@ -18,17 +18,17 @@ var ( Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), }, []string{"method", "status_code"}) - fetchedKeys = prometheus.NewCounter(prometheus.CounterOpts{ + fetchedKeys = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_fetched_keys", Help: "Total count of chunks requested from memcache.", - }) + }, []string{"name"}) - hits = prometheus.NewCounter(prometheus.CounterOpts{ + hits = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_hits", Help: "Total count of chunks found in memcache.", - }) + }, []string{"name"}) ) func init() { @@ -39,13 +39,16 @@ func init() { func instrument(name string, cache Cache) Cache { return &instrumentedCache{ - name: name, - Cache: cache, + name: name, + fetchedKeys: fetchedKeys.WithLabelValues(name), + hits: hits.WithLabelValues(name), + Cache: cache, } } type instrumentedCache struct { - name string + name string + fetchedKeys, hits prometheus.Counter Cache } @@ -76,8 +79,8 @@ func (i *instrumentedCache) FetchChunkData(ctx context.Context, keys []string) ( return err }) - fetchedKeys.Add(float64(len(keys))) - hits.Add(float64(len(found))) + i.fetchedKeys.Add(float64(len(keys))) + i.hits.Add(float64(len(found))) return found, bufs, missing, err } From 0194779b7038ce4c5c6ed002b7504ae6c5e96fe7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 7 Feb 2018 10:58:47 +0000 Subject: [PATCH 10/12] Write chunks found in lower-tier caches back to the upper tier. --- pkg/chunk/cache/tiered.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/chunk/cache/tiered.go b/pkg/chunk/cache/tiered.go index f2328ed28b5..9151e3afa74 100644 --- a/pkg/chunk/cache/tiered.go +++ b/pkg/chunk/cache/tiered.go @@ -21,6 +21,7 @@ func (t tiered) StoreChunk(ctx context.Context, key string, buf []byte) error { func (t tiered) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { found := make(map[string][]byte, len(keys)) missing := keys + previousCaches := make([]Cache, 0, len(t)) for _, c := range []Cache(t) { var ( @@ -36,7 +37,10 @@ func (t tiered) FetchChunkData(ctx context.Context, keys []string) ([]string, [] for i, key := range passKeys { found[key] = passBufs[i] + tiered(previousCaches).StoreChunk(ctx, key, passBufs[i]) } + + previousCaches = append(previousCaches, c) } resultKeys := make([]string, 0, len(found)) From 5ecd82208933fac9c2f76ba884122c9a91361de7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 7 Feb 2018 13:38:48 +0000 Subject: [PATCH 11/12] Query diskcache first. --- pkg/chunk/cache/cache.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go index 032062ee2c8..815f5c949be 100644 --- a/pkg/chunk/cache/cache.go +++ b/pkg/chunk/cache/cache.go @@ -35,12 +35,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { func New(cfg Config) (Cache, error) { caches := []Cache{} - if cfg.memcacheClient.Host != "" { - client := newMemcachedClient(cfg.memcacheClient) - cache := NewMemcached(cfg.memcache, client) - caches = append(caches, instrument("memcache", cache)) - } - if cfg.EnableDiskcache { cache, err := NewDiskcache(cfg.diskcache) if err != nil { @@ -49,6 +43,12 @@ func New(cfg Config) (Cache, error) { caches = append(caches, instrument("diskcache", cache)) } + if cfg.memcacheClient.Host != "" { + client := newMemcachedClient(cfg.memcacheClient) + cache := NewMemcached(cfg.memcache, client) + caches = append(caches, instrument("memcache", cache)) + } + var cache Cache = tiered(caches) if len(caches) > 1 { cache = instrument("tiered", cache) From a93268ff8f27a8e6719a7dc0178804f8eb94015c Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 20 Feb 2018 13:21:04 +0000 Subject: [PATCH 12/12] Fix nits --- pkg/chunk/cache/instrumented.go | 4 ++-- pkg/chunk/chunk_store.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/chunk/cache/instrumented.go b/pkg/chunk/cache/instrumented.go index f0104f4d734..2b582d2f0b3 100644 --- a/pkg/chunk/cache/instrumented.go +++ b/pkg/chunk/cache/instrumented.go @@ -21,13 +21,13 @@ var ( fetchedKeys = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_fetched_keys", - Help: "Total count of chunks requested from memcache.", + Help: "Total count of chunks requested from cache.", }, []string{"name"}) hits = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_hits", - Help: "Total count of chunks found in memcache.", + Help: "Total count of chunks found in cache.", }, []string{"name"}) ) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index aae93d92a69..b57b53caff0 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -37,7 +37,7 @@ var ( cacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "cortex", Name: "cache_corrupt_chunks_total", - Help: "Total count of corrupt chunks found in memcache.", + Help: "Total count of corrupt chunks found in cache.", }) ) @@ -262,7 +262,7 @@ outer: // ProcessCacheResponse decodes the chunks coming back from the cache, separating // hits and misses. func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { - ctx := NewDecodeContext() + decodeContext := NewDecodeContext() i, j := 0, 0 for i < len(chunks) && j < len(keys) { @@ -276,7 +276,7 @@ func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found [ j++ } else { chunk := chunks[i] - err = chunk.Decode(ctx, bufs[j]) + err = chunk.Decode(decodeContext, bufs[j]) if err != nil { cacheCorrupt.Inc() return