Skip to content
Merged
113 changes: 113 additions & 0 deletions pkg/chunk/cache/background.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package cache

import (
"context"
"flag"
"sync"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/cortex/pkg/util"
)

var (
droppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "cache_dropped_background_writes_total",
Help: "Total count of dropped write backs to cache.",
})
queueLength = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "cache_background_queue_length",
Help: "Length of the cache background write queue.",
})
)

func init() {
prometheus.MustRegister(droppedWriteBack)
prometheus.MustRegister(queueLength)
}

// BackgroundConfig is config for a Background Cache.
type BackgroundConfig struct {
WriteBackGoroutines int
WriteBackBuffer int
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.")
f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.")
}

type backgroundCache struct {
Cache

wg sync.WaitGroup
quit chan struct{}
bgWrites chan backgroundWrite
}

type backgroundWrite struct {
key string
buf []byte
}

// NewBackground returns a new Cache that does stores on background goroutines.
func NewBackground(cfg BackgroundConfig, cache Cache) Cache {
c := &backgroundCache{
Cache: cache,
quit: make(chan struct{}),
bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer),
}

c.wg.Add(cfg.WriteBackGoroutines)
for i := 0; i < cfg.WriteBackGoroutines; i++ {
go c.writeBackLoop()
}

return c
}

// Stop the background flushing goroutines.
func (c *backgroundCache) Stop() error {
close(c.quit)
c.wg.Wait()

return c.Cache.Stop()
}

// StoreChunk writes chunks for the cache in the background.
func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error {
bgWrite := backgroundWrite{
key: key,
buf: buf,
}
select {
case c.bgWrites <- bgWrite:
queueLength.Inc()
default:
droppedWriteBack.Inc()
}
return nil
}

func (c *backgroundCache) writeBackLoop() {
defer c.wg.Done()

for {
select {
case bgWrite, ok := <-c.bgWrites:
if !ok {
return
}
queueLength.Dec()
err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf)
if err != nil {
level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err)
}
case <-c.quit:
return
}
}
}
7 changes: 7 additions & 0 deletions pkg/chunk/cache/background_extra_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package cache

func Flush(c Cache) {
b := c.(*backgroundCache)
close(b.bgWrites)
b.wg.Wait()
}
60 changes: 60 additions & 0 deletions pkg/chunk/cache/background_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package cache_test

import (
"context"
"sync"
"testing"

"github.com/weaveworks/cortex/pkg/chunk/cache"
)

type mockCache struct {
sync.Mutex
cache map[string][]byte
}

func (m *mockCache) StoreChunk(_ context.Context, key string, buf []byte) error {
m.Lock()
defer m.Unlock()
m.cache[key] = buf
return nil
}

func (m *mockCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) {
m.Lock()
defer m.Unlock()
for _, key := range keys {
buf, ok := m.cache[key]
if ok {
found = append(found, key)
bufs = append(bufs, buf)
} else {
missing = append(missing, key)
}
}
return
}

func (m *mockCache) Stop() error {
return nil
}

func newMockCache() cache.Cache {
return &mockCache{
cache: map[string][]byte{},
}
}

func TestBackground(t *testing.T) {
c := cache.NewBackground(cache.BackgroundConfig{
WriteBackGoroutines: 1,
WriteBackBuffer: 100,
}, newMockCache())

keys, chunks := fillCache(t, c)
cache.Flush(c)

testCacheSingle(t, c, keys, chunks)
testCacheMultiple(t, c, keys, chunks)
testCacheMiss(t, c)
}
59 changes: 59 additions & 0 deletions pkg/chunk/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package cache

import (
"context"
"flag"
)

// Cache byte arrays by key.
type Cache interface {
StoreChunk(ctx context.Context, key string, buf []byte) error
FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error)
Stop() error
}

// Config for building Caches.
type Config struct {
EnableDiskcache bool

background BackgroundConfig
memcache MemcachedConfig
memcacheClient MemcachedClientConfig
diskcache DiskcacheConfig
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache")

cfg.memcache.RegisterFlags(f)
cfg.memcacheClient.RegisterFlags(f)
cfg.diskcache.RegisterFlags(f)
}

// New creates a new Cache using Config.
func New(cfg Config) (Cache, error) {
caches := []Cache{}

if cfg.EnableDiskcache {
cache, err := NewDiskcache(cfg.diskcache)
if err != nil {
return nil, err
}
caches = append(caches, instrument("diskcache", cache))
}

if cfg.memcacheClient.Host != "" {
client := newMemcachedClient(cfg.memcacheClient)
cache := NewMemcached(cfg.memcache, client)
caches = append(caches, instrument("memcache", cache))
}

var cache Cache = tiered(caches)
if len(caches) > 1 {
cache = instrument("tiered", cache)
}

cache = NewBackground(cfg.background, cache)
return cache, nil
}
125 changes: 125 additions & 0 deletions pkg/chunk/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package cache_test

import (
"context"
"math/rand"
"os"
"path"
"strconv"
"testing"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/weaveworks/cortex/pkg/chunk"
"github.com/weaveworks/cortex/pkg/chunk/cache"
prom_chunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk"
)

func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) {
const (
userID = "1"
chunkLen = 13 * 3600 // in seconds
)

// put 100 chunks from 0 to 99
keys := []string{}
chunks := []chunk.Chunk{}
for i := 0; i < 100; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
promChunk, _ := prom_chunk.New().Add(model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(i),
})
c := chunk.NewChunk(
userID,
model.Fingerprint(1),
model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
},
promChunk[0],
ts,
ts.Add(chunkLen),
)

buf, err := c.Encode()
require.NoError(t, err)

key := c.ExternalKey()
err = cache.StoreChunk(context.Background(), key, buf)
require.NoError(t, err)

keys = append(keys, key)
chunks = append(chunks, c)
}

return keys, chunks
}

func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) {
for i := 0; i < 100; i++ {
index := rand.Intn(len(keys))
key := keys[index]

found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), []string{key})
require.NoError(t, err)
require.Len(t, found, 1)
require.Len(t, bufs, 1)
require.Len(t, missingKeys, 0)

foundChunks, missing, err := chunk.ProcessCacheResponse([]chunk.Chunk{chunks[index]}, found, bufs)
require.NoError(t, err)
require.Empty(t, missing)
require.Equal(t, chunks[index], foundChunks[0])
}
}

func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) {
// test getting them all
found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), keys)
require.NoError(t, err)
require.Len(t, found, len(keys))
require.Len(t, bufs, len(keys))
require.Len(t, missingKeys, 0)

foundChunks, missing, err := chunk.ProcessCacheResponse(chunks, found, bufs)
require.NoError(t, err)
require.Empty(t, missing)
require.Equal(t, chunks, foundChunks)
}

func testCacheMiss(t *testing.T, cache cache.Cache) {
for i := 0; i < 100; i++ {
key := strconv.Itoa(rand.Int())
found, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{key})
require.NoError(t, err)
require.Empty(t, found)
require.Empty(t, bufs)
require.Len(t, missing, 1)
}
}

func testCache(t *testing.T, cache cache.Cache) {
keys, chunks := fillCache(t, cache)
testCacheSingle(t, cache, keys, chunks)
testCacheMultiple(t, cache, keys, chunks)
testCacheMiss(t, cache)
}

func TestMemcache(t *testing.T) {
cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache())
testCache(t, cache)
}

func TestDiskcache(t *testing.T) {
dirname := os.TempDir()
filename := path.Join(dirname, "diskcache")
defer os.RemoveAll(filename)

cache, err := cache.NewDiskcache(cache.DiskcacheConfig{
Path: filename,
Size: 100 * 1024 * 1024,
})
require.NoError(t, err)
testCache(t, cache)
}
Loading