diff --git a/pkg/chunk/cache/background.go b/pkg/chunk/cache/background.go new file mode 100644 index 00000000000..2560cc49a7e --- /dev/null +++ b/pkg/chunk/cache/background.go @@ -0,0 +1,113 @@ +package cache + +import ( + "context" + "flag" + "sync" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/cortex/pkg/util" +) + +var ( + droppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_dropped_background_writes_total", + Help: "Total count of dropped write backs to cache.", + }) + queueLength = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "cache_background_queue_length", + Help: "Length of the cache background write queue.", + }) +) + +func init() { + prometheus.MustRegister(droppedWriteBack) + prometheus.MustRegister(queueLength) +} + +// BackgroundConfig is config for a Background Cache. +type BackgroundConfig struct { + WriteBackGoroutines int + WriteBackBuffer int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *BackgroundConfig) RegisterFlags(f *flag.FlagSet) { + f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") + f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") +} + +type backgroundCache struct { + Cache + + wg sync.WaitGroup + quit chan struct{} + bgWrites chan backgroundWrite +} + +type backgroundWrite struct { + key string + buf []byte +} + +// NewBackground returns a new Cache that does stores on background goroutines. +func NewBackground(cfg BackgroundConfig, cache Cache) Cache { + c := &backgroundCache{ + Cache: cache, + quit: make(chan struct{}), + bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), + } + + c.wg.Add(cfg.WriteBackGoroutines) + for i := 0; i < cfg.WriteBackGoroutines; i++ { + go c.writeBackLoop() + } + + return c +} + +// Stop the background flushing goroutines. +func (c *backgroundCache) Stop() error { + close(c.quit) + c.wg.Wait() + + return c.Cache.Stop() +} + +// StoreChunk writes chunks for the cache in the background. +func (c *backgroundCache) StoreChunk(ctx context.Context, key string, buf []byte) error { + bgWrite := backgroundWrite{ + key: key, + buf: buf, + } + select { + case c.bgWrites <- bgWrite: + queueLength.Inc() + default: + droppedWriteBack.Inc() + } + return nil +} + +func (c *backgroundCache) writeBackLoop() { + defer c.wg.Done() + + for { + select { + case bgWrite, ok := <-c.bgWrites: + if !ok { + return + } + queueLength.Dec() + err := c.Cache.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) + if err != nil { + level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err) + } + case <-c.quit: + return + } + } +} diff --git a/pkg/chunk/cache/background_extra_test.go b/pkg/chunk/cache/background_extra_test.go new file mode 100644 index 00000000000..07aa21c847f --- /dev/null +++ b/pkg/chunk/cache/background_extra_test.go @@ -0,0 +1,7 @@ +package cache + +func Flush(c Cache) { + b := c.(*backgroundCache) + close(b.bgWrites) + b.wg.Wait() +} diff --git a/pkg/chunk/cache/background_test.go b/pkg/chunk/cache/background_test.go new file mode 100644 index 00000000000..ebf540abd75 --- /dev/null +++ b/pkg/chunk/cache/background_test.go @@ -0,0 +1,60 @@ +package cache_test + +import ( + "context" + "sync" + "testing" + + "github.com/weaveworks/cortex/pkg/chunk/cache" +) + +type mockCache struct { + sync.Mutex + cache map[string][]byte +} + +func (m *mockCache) StoreChunk(_ context.Context, key string, buf []byte) error { + m.Lock() + defer m.Unlock() + m.cache[key] = buf + return nil +} + +func (m *mockCache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) { + m.Lock() + defer m.Unlock() + for _, key := range keys { + buf, ok := m.cache[key] + if ok { + found = append(found, key) + bufs = append(bufs, buf) + } else { + missing = append(missing, key) + } + } + return +} + +func (m *mockCache) Stop() error { + return nil +} + +func newMockCache() cache.Cache { + return &mockCache{ + cache: map[string][]byte{}, + } +} + +func TestBackground(t *testing.T) { + c := cache.NewBackground(cache.BackgroundConfig{ + WriteBackGoroutines: 1, + WriteBackBuffer: 100, + }, newMockCache()) + + keys, chunks := fillCache(t, c) + cache.Flush(c) + + testCacheSingle(t, c, keys, chunks) + testCacheMultiple(t, c, keys, chunks) + testCacheMiss(t, c) +} diff --git a/pkg/chunk/cache/cache.go b/pkg/chunk/cache/cache.go new file mode 100644 index 00000000000..815f5c949be --- /dev/null +++ b/pkg/chunk/cache/cache.go @@ -0,0 +1,59 @@ +package cache + +import ( + "context" + "flag" +) + +// Cache byte arrays by key. +type Cache interface { + StoreChunk(ctx context.Context, key string, buf []byte) error + FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missing []string, err error) + Stop() error +} + +// Config for building Caches. +type Config struct { + EnableDiskcache bool + + background BackgroundConfig + memcache MemcachedConfig + memcacheClient MemcachedClientConfig + diskcache DiskcacheConfig +} + +// RegisterFlags adds the flags required to config this to the given FlagSet. +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.BoolVar(&cfg.EnableDiskcache, "cache.enable-diskcache", false, "Enable on-disk cache") + + cfg.memcache.RegisterFlags(f) + cfg.memcacheClient.RegisterFlags(f) + cfg.diskcache.RegisterFlags(f) +} + +// New creates a new Cache using Config. +func New(cfg Config) (Cache, error) { + caches := []Cache{} + + if cfg.EnableDiskcache { + cache, err := NewDiskcache(cfg.diskcache) + if err != nil { + return nil, err + } + caches = append(caches, instrument("diskcache", cache)) + } + + if cfg.memcacheClient.Host != "" { + client := newMemcachedClient(cfg.memcacheClient) + cache := NewMemcached(cfg.memcache, client) + caches = append(caches, instrument("memcache", cache)) + } + + var cache Cache = tiered(caches) + if len(caches) > 1 { + cache = instrument("tiered", cache) + } + + cache = NewBackground(cfg.background, cache) + return cache, nil +} diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go new file mode 100644 index 00000000000..b0df014dd23 --- /dev/null +++ b/pkg/chunk/cache/cache_test.go @@ -0,0 +1,125 @@ +package cache_test + +import ( + "context" + "math/rand" + "os" + "path" + "strconv" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/cache" + prom_chunk "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" +) + +func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { + const ( + userID = "1" + chunkLen = 13 * 3600 // in seconds + ) + + // put 100 chunks from 0 to 99 + keys := []string{} + chunks := []chunk.Chunk{} + for i := 0; i < 100; i++ { + ts := model.TimeFromUnix(int64(i * chunkLen)) + promChunk, _ := prom_chunk.New().Add(model.SamplePair{ + Timestamp: ts, + Value: model.SampleValue(i), + }) + c := chunk.NewChunk( + userID, + model.Fingerprint(1), + model.Metric{ + model.MetricNameLabel: "foo", + "bar": "baz", + }, + promChunk[0], + ts, + ts.Add(chunkLen), + ) + + buf, err := c.Encode() + require.NoError(t, err) + + key := c.ExternalKey() + err = cache.StoreChunk(context.Background(), key, buf) + require.NoError(t, err) + + keys = append(keys, key) + chunks = append(chunks, c) + } + + return keys, chunks +} + +func testCacheSingle(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) { + for i := 0; i < 100; i++ { + index := rand.Intn(len(keys)) + key := keys[index] + + found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), []string{key}) + require.NoError(t, err) + require.Len(t, found, 1) + require.Len(t, bufs, 1) + require.Len(t, missingKeys, 0) + + foundChunks, missing, err := chunk.ProcessCacheResponse([]chunk.Chunk{chunks[index]}, found, bufs) + require.NoError(t, err) + require.Empty(t, missing) + require.Equal(t, chunks[index], foundChunks[0]) + } +} + +func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []chunk.Chunk) { + // test getting them all + found, bufs, missingKeys, err := cache.FetchChunkData(context.Background(), keys) + require.NoError(t, err) + require.Len(t, found, len(keys)) + require.Len(t, bufs, len(keys)) + require.Len(t, missingKeys, 0) + + foundChunks, missing, err := chunk.ProcessCacheResponse(chunks, found, bufs) + require.NoError(t, err) + require.Empty(t, missing) + require.Equal(t, chunks, foundChunks) +} + +func testCacheMiss(t *testing.T, cache cache.Cache) { + for i := 0; i < 100; i++ { + key := strconv.Itoa(rand.Int()) + found, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{key}) + require.NoError(t, err) + require.Empty(t, found) + require.Empty(t, bufs) + require.Len(t, missing, 1) + } +} + +func testCache(t *testing.T, cache cache.Cache) { + keys, chunks := fillCache(t, cache) + testCacheSingle(t, cache, keys, chunks) + testCacheMultiple(t, cache, keys, chunks) + testCacheMiss(t, cache) +} + +func TestMemcache(t *testing.T) { + cache := cache.NewMemcached(cache.MemcachedConfig{}, newMockMemcache()) + testCache(t, cache) +} + +func TestDiskcache(t *testing.T) { + dirname := os.TempDir() + filename := path.Join(dirname, "diskcache") + defer os.RemoveAll(filename) + + cache, err := cache.NewDiskcache(cache.DiskcacheConfig{ + Path: filename, + Size: 100 * 1024 * 1024, + }) + require.NoError(t, err) + testCache(t, cache) +} diff --git a/pkg/chunk/cache/diskcache.go b/pkg/chunk/cache/diskcache.go new file mode 100644 index 00000000000..d32683aa40a --- /dev/null +++ b/pkg/chunk/cache/diskcache.go @@ -0,0 +1,160 @@ +package cache + +import ( + "context" + "encoding/binary" + "flag" + "fmt" + "hash/fnv" + "os" + "sync" + + "github.com/pkg/errors" + "github.com/prometheus/tsdb/fileutil" + "golang.org/x/sys/unix" +) + +// TODO: in the future we could cuckoo hash or linear probe. + +// Buckets contain key (~50), chunks (1024) and their metadata (~100) +const bucketSize = 2048 + +// DiskcacheConfig for the Disk cache. +type DiskcacheConfig struct { + Path string + Size int +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *DiskcacheConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Path, "diskcache.path", "/var/run/chunks", "Path to file used to cache chunks.") + f.IntVar(&cfg.Size, "diskcache.size", 1024*1024*1024, "Size of file (bytes)") +} + +// Diskcache is an on-disk chunk cache. +type Diskcache struct { + mtx sync.RWMutex + f *os.File + buckets uint32 + buf []byte +} + +// NewDiskcache creates a new on-disk cache. +func NewDiskcache(cfg DiskcacheConfig) (*Diskcache, error) { + f, err := os.OpenFile(cfg.Path, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, errors.Wrap(err, "open") + } + + if err := fileutil.Preallocate(f, int64(cfg.Size), true); err != nil { + return nil, errors.Wrap(err, "preallocate") + } + + info, err := f.Stat() + if err != nil { + return nil, errors.Wrap(err, "stat") + } + + buf, err := unix.Mmap(int(f.Fd()), 0, int(info.Size()), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED) + if err != nil { + f.Close() + return nil, err + } + + buckets := len(buf) / bucketSize + + return &Diskcache{ + f: f, + buf: buf, + buckets: uint32(buckets), + }, nil +} + +// Stop closes the file. +func (d *Diskcache) Stop() error { + if err := unix.Munmap(d.buf); err != nil { + return err + } + return d.f.Close() +} + +// FetchChunkData get chunks from the cache. +func (d *Diskcache) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { + for _, key := range keys { + buf, ok := d.fetch(key) + if ok { + found = append(found, key) + bufs = append(bufs, buf) + } else { + missed = append(missed, key) + } + } + return +} + +func (d *Diskcache) fetch(key string) ([]byte, bool) { + d.mtx.RLock() + defer d.mtx.RUnlock() + + bucket := hash(key) % d.buckets + buf := d.buf[bucket*bucketSize : (bucket+1)*bucketSize] + + existingKey, n, ok := get(buf, 0) + if !ok || string(existingKey) != key { + return nil, false + } + + existingValue, _, ok := get(buf, n) + if !ok { + return nil, false + } + + result := make([]byte, len(existingValue), len(existingValue)) + copy(result, existingValue) + return result, true +} + +// StoreChunk puts a chunk into the cache. +func (d *Diskcache) StoreChunk(ctx context.Context, key string, value []byte) error { + d.mtx.Lock() + defer d.mtx.Unlock() + + bucket := hash(key) % d.buckets + buf := d.buf[bucket*bucketSize : (bucket+1)*bucketSize] + + n, err := put([]byte(key), buf, 0) + if err != nil { + return err + } + + _, err = put(value, buf, n) + if err != nil { + return err + } + + return nil +} + +func put(value []byte, buf []byte, n int) (int, error) { + if len(value)+n+4 > len(buf) { + return 0, errors.Wrap(fmt.Errorf("value too big: %d > %d", len(value), len(buf)), "put") + } + m := binary.PutUvarint(buf[n:], uint64(len(value))) + copy(buf[n+m:], value) + return len(value) + n + m, nil +} + +func get(buf []byte, n int) ([]byte, int, bool) { + size, m := binary.Uvarint(buf[n:]) + end := n + m + int(size) + if end > len(buf) { + return nil, 0, false + } + return buf[n+m : end], end, true +} + +func hash(key string) uint32 { + h := fnv.New32() + h.Write([]byte(key)) + return h.Sum32() +} diff --git a/pkg/chunk/cache/instrumented.go b/pkg/chunk/cache/instrumented.go new file mode 100644 index 00000000000..2b582d2f0b3 --- /dev/null +++ b/pkg/chunk/cache/instrumented.go @@ -0,0 +1,89 @@ +package cache + +import ( + "context" + + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/client_golang/prometheus" + instr "github.com/weaveworks/common/instrument" +) + +var ( + requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "cache_request_duration_seconds", + Help: "Total time spent in seconds doing cache requests.", + // Cache requests are very quick: smallest bucket is 16us, biggest is 1s. + Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), + }, []string{"method", "status_code"}) + + fetchedKeys = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_fetched_keys", + Help: "Total count of chunks requested from cache.", + }, []string{"name"}) + + hits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_hits", + Help: "Total count of chunks found in cache.", + }, []string{"name"}) +) + +func init() { + prometheus.MustRegister(requestDuration) + prometheus.MustRegister(fetchedKeys) + prometheus.MustRegister(hits) +} + +func instrument(name string, cache Cache) Cache { + return &instrumentedCache{ + name: name, + fetchedKeys: fetchedKeys.WithLabelValues(name), + hits: hits.WithLabelValues(name), + Cache: cache, + } +} + +type instrumentedCache struct { + name string + fetchedKeys, hits prometheus.Counter + Cache +} + +func (i *instrumentedCache) StoreChunk(ctx context.Context, key string, buf []byte) error { + return instr.TimeRequestHistogram(ctx, i.name+".store", requestDuration, func(ctx context.Context) error { + return i.Cache.StoreChunk(ctx, key, buf) + }) +} + +func (i *instrumentedCache) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { + var ( + found []string + bufs [][]byte + missing []string + ) + err := instr.TimeRequestHistogram(ctx, i.name+".fetch", requestDuration, func(ctx context.Context) error { + sp := ot.SpanFromContext(ctx) + sp.LogFields(otlog.Int("chunks requested", len(keys))) + + var err error + found, bufs, missing, err = i.Cache.FetchChunkData(ctx, keys) + + if err == nil { + sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(keys)-len(found))) + } else { + sp.LogFields(otlog.Error(err)) + } + + return err + }) + i.fetchedKeys.Add(float64(len(keys))) + i.hits.Add(float64(len(found))) + return found, bufs, missing, err +} + +func (i *instrumentedCache) Stop() error { + return i.Cache.Stop() +} diff --git a/pkg/chunk/cache/memcached.go b/pkg/chunk/cache/memcached.go new file mode 100644 index 00000000000..7cfb33d4c71 --- /dev/null +++ b/pkg/chunk/cache/memcached.go @@ -0,0 +1,104 @@ +package cache + +import ( + "context" + "flag" + "time" + + "github.com/bradfitz/gomemcache/memcache" + "github.com/prometheus/client_golang/prometheus" + instr "github.com/weaveworks/common/instrument" +) + +var ( + memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "memcache_request_duration_seconds", + Help: "Total time spent in seconds doing memcache requests.", + // Memecache requests are very quick: smallest bucket is 16us, biggest is 1s + Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), + }, []string{"method", "status_code"}) +) + +func init() { + prometheus.MustRegister(memcacheRequestDuration) +} + +// MemcachedConfig is config to make a Memcached +type MemcachedConfig struct { + Expiration time.Duration +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *MemcachedConfig) RegisterFlags(f *flag.FlagSet) { + f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") +} + +// Memcached type caches chunks in memcached +type Memcached struct { + cfg MemcachedConfig + memcache MemcachedClient +} + +// NewMemcached makes a new Memcache +func NewMemcached(cfg MemcachedConfig, client MemcachedClient) *Memcached { + c := &Memcached{ + cfg: cfg, + memcache: client, + } + return c +} + +func memcacheStatusCode(err error) string { + // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables + switch err { + case nil: + return "200" + case memcache.ErrCacheMiss: + return "404" + case memcache.ErrMalformedKey: + return "400" + default: + return "500" + } +} + +// FetchChunkData gets chunks from the chunk cache. +func (c *Memcached) FetchChunkData(ctx context.Context, keys []string) (found []string, bufs [][]byte, missed []string, err error) { + var items map[string]*memcache.Item + err = instr.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + var err error + items, err = c.memcache.GetMulti(keys) + return err + }) + if err != nil { + return + } + for _, key := range keys { + item, ok := items[key] + if ok { + found = append(found, key) + bufs = append(bufs, item.Value) + } else { + missed = append(missed, key) + } + } + return +} + +// StoreChunk serializes and stores a chunk in the chunk cache. +func (c *Memcached) StoreChunk(ctx context.Context, key string, buf []byte) error { + return instr.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { + item := memcache.Item{ + Key: key, + Value: buf, + Expiration: int32(c.cfg.Expiration.Seconds()), + } + return c.memcache.Set(&item) + }) +} + +// Stop does nothing. +func (*Memcached) Stop() error { + return nil +} diff --git a/pkg/chunk/memcache_client.go b/pkg/chunk/cache/memcached_client.go similarity index 75% rename from pkg/chunk/memcache_client.go rename to pkg/chunk/cache/memcached_client.go index f88556479ba..43a52fffab3 100644 --- a/pkg/chunk/memcache_client.go +++ b/pkg/chunk/cache/memcached_client.go @@ -1,4 +1,4 @@ -package chunk +package cache import ( "flag" @@ -13,9 +13,15 @@ import ( "github.com/weaveworks/cortex/pkg/util" ) -// MemcacheClient is a memcache client that gets its server list from SRV +// MemcachedClient interface exists for mocking memcacheClient. +type MemcachedClient interface { + GetMulti(keys []string) (map[string]*memcache.Item, error) + Set(item *memcache.Item) error +} + +// memcachedClient is a memcache client that gets its server list from SRV // records, and periodically updates that ServerList. -type MemcacheClient struct { +type memcachedClient struct { *memcache.Client serverList *memcache.ServerList hostname string @@ -25,8 +31,8 @@ type MemcacheClient struct { wait sync.WaitGroup } -// MemcacheConfig defines how a MemcacheClient should be constructed. -type MemcacheConfig struct { +// MemcachedClientConfig defines how a MemcachedClient should be constructed. +type MemcachedClientConfig struct { Host string Service string Timeout time.Duration @@ -34,21 +40,21 @@ type MemcacheConfig struct { } // RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *MemcacheConfig) RegisterFlags(f *flag.FlagSet) { +func (cfg *MemcachedClientConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Host, "memcached.hostname", "", "Hostname for memcached service to use when caching chunks. If empty, no memcached will be used.") f.StringVar(&cfg.Service, "memcached.service", "memcached", "SRV service used to discover memcache servers.") f.DurationVar(&cfg.Timeout, "memcached.timeout", 100*time.Millisecond, "Maximum time to wait before giving up on memcached requests.") f.DurationVar(&cfg.UpdateInterval, "memcached.update-interval", 1*time.Minute, "Period with which to poll DNS for memcache servers.") } -// NewMemcacheClient creates a new MemcacheClient that gets its server list +// newMemcachedClient creates a new MemcacheClient that gets its server list // from SRV and updates the server list on a regular basis. -func NewMemcacheClient(cfg MemcacheConfig) *MemcacheClient { +func newMemcachedClient(cfg MemcachedClientConfig) *memcachedClient { var servers memcache.ServerList client := memcache.NewFromSelector(&servers) client.Timeout = cfg.Timeout - newClient := &MemcacheClient{ + newClient := &memcachedClient{ Client: client, serverList: &servers, hostname: cfg.Host, @@ -66,12 +72,12 @@ func NewMemcacheClient(cfg MemcacheConfig) *MemcacheClient { } // Stop the memcache client. -func (c *MemcacheClient) Stop() { +func (c *memcachedClient) Stop() { close(c.quit) c.wait.Wait() } -func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error { +func (c *memcachedClient) updateLoop(updateInterval time.Duration) error { defer c.wait.Done() ticker := time.NewTicker(updateInterval) var err error @@ -90,7 +96,7 @@ func (c *MemcacheClient) updateLoop(updateInterval time.Duration) error { // updateMemcacheServers sets a memcache server list from SRV records. SRV // priority & weight are ignored. -func (c *MemcacheClient) updateMemcacheServers() error { +func (c *memcachedClient) updateMemcacheServers() error { _, addrs, err := net.LookupSRV(c.service, "tcp", c.hostname) if err != nil { return err diff --git a/pkg/chunk/cache/memcached_test.go b/pkg/chunk/cache/memcached_test.go new file mode 100644 index 00000000000..028fba8ef46 --- /dev/null +++ b/pkg/chunk/cache/memcached_test.go @@ -0,0 +1,39 @@ +package cache_test + +import ( + "sync" + + "github.com/bradfitz/gomemcache/memcache" +) + +type mockMemcache struct { + sync.RWMutex + contents map[string][]byte +} + +func newMockMemcache() *mockMemcache { + return &mockMemcache{ + contents: map[string][]byte{}, + } +} + +func (m *mockMemcache) GetMulti(keys []string) (map[string]*memcache.Item, error) { + m.RLock() + defer m.RUnlock() + result := map[string]*memcache.Item{} + for _, k := range keys { + if c, ok := m.contents[k]; ok { + result[k] = &memcache.Item{ + Value: c, + } + } + } + return result, nil +} + +func (m *mockMemcache) Set(item *memcache.Item) error { + m.Lock() + defer m.Unlock() + m.contents[item.Key] = item.Value + return nil +} diff --git a/pkg/chunk/cache/tiered.go b/pkg/chunk/cache/tiered.go new file mode 100644 index 00000000000..9151e3afa74 --- /dev/null +++ b/pkg/chunk/cache/tiered.go @@ -0,0 +1,65 @@ +package cache + +import "context" + +type tiered []Cache + +// NewTiered makes a new tiered cache. +func NewTiered(caches []Cache) Cache { + return tiered(caches) +} + +func (t tiered) StoreChunk(ctx context.Context, key string, buf []byte) error { + for _, c := range []Cache(t) { + if err := c.StoreChunk(ctx, key, buf); err != nil { + return err + } + } + return nil +} + +func (t tiered) FetchChunkData(ctx context.Context, keys []string) ([]string, [][]byte, []string, error) { + found := make(map[string][]byte, len(keys)) + missing := keys + previousCaches := make([]Cache, 0, len(t)) + + for _, c := range []Cache(t) { + var ( + err error + passKeys []string + passBufs [][]byte + ) + + passKeys, passBufs, missing, err = c.FetchChunkData(ctx, missing) + if err != nil { + return nil, nil, nil, err + } + + for i, key := range passKeys { + found[key] = passBufs[i] + tiered(previousCaches).StoreChunk(ctx, key, passBufs[i]) + } + + previousCaches = append(previousCaches, c) + } + + resultKeys := make([]string, 0, len(found)) + resultBufs := make([][]byte, 0, len(found)) + for _, key := range keys { + if buf, ok := found[key]; ok { + resultKeys = append(resultKeys, key) + resultBufs = append(resultBufs, buf) + } + } + + return resultKeys, resultBufs, missing, nil +} + +func (t tiered) Stop() error { + for _, c := range []Cache(t) { + if err := c.Stop(); err != nil { + return err + } + } + return nil +} diff --git a/pkg/chunk/cache/tiered_test.go b/pkg/chunk/cache/tiered_test.go new file mode 100644 index 00000000000..32cc3b6bc46 --- /dev/null +++ b/pkg/chunk/cache/tiered_test.go @@ -0,0 +1,37 @@ +package cache_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/weaveworks/cortex/pkg/chunk/cache" +) + +func TestTieredSimple(t *testing.T) { + for i := 1; i < 10; i++ { + caches := []cache.Cache{} + for j := 0; j <= i; j++ { + caches = append(caches, newMockCache()) + } + cache := cache.NewTiered(caches) + testCache(t, cache) + } +} + +func TestTiered(t *testing.T) { + level1, level2 := newMockCache(), newMockCache() + cache := cache.NewTiered([]cache.Cache{level1, level2}) + + err := level1.StoreChunk(context.Background(), "key1", []byte("hello")) + require.NoError(t, err) + + err = level2.StoreChunk(context.Background(), "key2", []byte("world")) + require.NoError(t, err) + + keys, bufs, missing, err := cache.FetchChunkData(context.Background(), []string{"key1", "key2", "key3"}) + require.NoError(t, err) + require.Equal(t, []string{"key1", "key2"}, keys) + require.Equal(t, [][]byte{[]byte("hello"), []byte("world")}, bufs) + require.Equal(t, []string{"key3"}, missing) +} diff --git a/pkg/chunk/chunk_cache.go b/pkg/chunk/chunk_cache.go deleted file mode 100644 index 3c5ae9459bf..00000000000 --- a/pkg/chunk/chunk_cache.go +++ /dev/null @@ -1,231 +0,0 @@ -package chunk - -import ( - "context" - "flag" - "sync" - "time" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/go-kit/kit/log/level" - ot "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/weaveworks/common/instrument" - - "github.com/weaveworks/cortex/pkg/util" -) - -var ( - memcacheRequests = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_requests_total", - Help: "Total count of chunks requested from memcache.", - }) - - memcacheHits = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_hits_total", - Help: "Total count of chunks found in memcache.", - }) - - memcacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_corrupt_chunks_total", - Help: "Total count of corrupt chunks found in memcache.", - }) - - memcacheDroppedWriteBack = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "memcache_dropped_write_back", - Help: "Total count of dropped write backs to memcache.", - }) - - memcacheRequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: "cortex", - Name: "memcache_request_duration_seconds", - Help: "Total time spent in seconds doing memcache requests.", - // Memecache requests are very quick: smallest bucket is 16us, biggest is 1s - Buckets: prometheus.ExponentialBuckets(0.000016, 4, 8), - }, []string{"method", "status_code"}) -) - -func init() { - prometheus.MustRegister(memcacheRequests) - prometheus.MustRegister(memcacheHits) - prometheus.MustRegister(memcacheCorrupt) - prometheus.MustRegister(memcacheDroppedWriteBack) - prometheus.MustRegister(memcacheRequestDuration) -} - -// Memcache caches things -type Memcache interface { - GetMulti(keys []string) (map[string]*memcache.Item, error) - Set(item *memcache.Item) error -} - -// CacheConfig is config to make a Cache -type CacheConfig struct { - Expiration time.Duration - WriteBackGoroutines int - WriteBackBuffer int - memcacheConfig MemcacheConfig -} - -// RegisterFlags adds the flags required to config this to the given FlagSet -func (cfg *CacheConfig) RegisterFlags(f *flag.FlagSet) { - f.DurationVar(&cfg.Expiration, "memcached.expiration", 0, "How long chunks stay in the memcache.") - f.IntVar(&cfg.WriteBackGoroutines, "memcache.write-back-goroutines", 10, "How many goroutines to use to write back to memcache.") - f.IntVar(&cfg.WriteBackBuffer, "memcache.write-back-buffer", 10000, "How many chunks to buffer for background write back.") - cfg.memcacheConfig.RegisterFlags(f) -} - -// Cache type caches chunks -type Cache struct { - cfg CacheConfig - memcache Memcache - - wg sync.WaitGroup - quit chan struct{} - bgWrites chan backgroundWrite -} - -type backgroundWrite struct { - key string - buf []byte -} - -// NewCache makes a new Cache -func NewCache(cfg CacheConfig) *Cache { - var memcache Memcache - if cfg.memcacheConfig.Host != "" { - memcache = NewMemcacheClient(cfg.memcacheConfig) - } - c := &Cache{ - cfg: cfg, - memcache: memcache, - quit: make(chan struct{}), - bgWrites: make(chan backgroundWrite, cfg.WriteBackBuffer), - } - c.wg.Add(cfg.WriteBackGoroutines) - for i := 0; i < cfg.WriteBackGoroutines; i++ { - go c.writeBackLoop() - } - return c -} - -// Stop the background flushing goroutines. -func (c *Cache) Stop() { - close(c.quit) - c.wg.Wait() -} - -func memcacheStatusCode(err error) string { - // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables - switch err { - case nil: - return "200" - case memcache.ErrCacheMiss: - return "404" - case memcache.ErrMalformedKey: - return "400" - default: - return "500" - } -} - -// FetchChunkData gets chunks from the chunk cache. -func (c *Cache) FetchChunkData(ctx context.Context, chunks []Chunk) (found []Chunk, missing []Chunk, err error) { - sp, ctx := ot.StartSpanFromContext(ctx, "FetchChunkData") - defer sp.Finish() - sp.LogFields(otlog.Int("chunks requested", len(chunks))) - - if c.memcache == nil { - return nil, chunks, nil - } - - memcacheRequests.Add(float64(len(chunks))) - - keys := make([]string, 0, len(chunks)) - for _, chunk := range chunks { - keys = append(keys, chunk.ExternalKey()) - } - - var items map[string]*memcache.Item - err = instrument.TimeRequestHistogramStatus(ctx, "Memcache.Get", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { - var err error - items, err = c.memcache.GetMulti(keys) - return err - }) - if err != nil { - return nil, chunks, err - } - - sp.LogFields(otlog.Int("chunks returned", len(items))) - decodeContext := NewDecodeContext() - for i, externalKey := range keys { - item, ok := items[externalKey] - if !ok { - missing = append(missing, chunks[i]) - continue - } - - if err := chunks[i].Decode(decodeContext, item.Value); err != nil { - memcacheCorrupt.Inc() - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "failed to decode chunk from cache", "err", err) - missing = append(missing, chunks[i]) - continue - } - - found = append(found, chunks[i]) - } - sp.LogFields(otlog.Int("chunks found", len(found)), otlog.Int("chunks missing", len(missing))) - - memcacheHits.Add(float64(len(found))) - return found, missing, nil -} - -// StoreChunk serializes and stores a chunk in the chunk cache. -func (c *Cache) StoreChunk(ctx context.Context, key string, buf []byte) error { - if c.memcache == nil { - return nil - } - - return instrument.TimeRequestHistogramStatus(ctx, "Memcache.Put", memcacheRequestDuration, memcacheStatusCode, func(_ context.Context) error { - item := memcache.Item{ - Key: key, - Value: buf, - Expiration: int32(c.cfg.Expiration.Seconds()), - } - return c.memcache.Set(&item) - }) -} - -// BackgroundWrite writes chunks for the cache in the background -func (c *Cache) BackgroundWrite(key string, buf []byte) { - bgWrite := backgroundWrite{ - key: key, - buf: buf, - } - select { - case c.bgWrites <- bgWrite: - default: - memcacheDroppedWriteBack.Inc() - } -} - -func (c *Cache) writeBackLoop() { - defer c.wg.Done() - - for { - select { - case bgWrite := <-c.bgWrites: - err := c.StoreChunk(context.Background(), bgWrite.key, bgWrite.buf) - if err != nil { - level.Error(util.Logger).Log("msg", "error writing to memcache", "err", err) - } - case <-c.quit: - return - } - } -} diff --git a/pkg/chunk/chunk_cache_test.go b/pkg/chunk/chunk_cache_test.go deleted file mode 100644 index 09144ce5826..00000000000 --- a/pkg/chunk/chunk_cache_test.go +++ /dev/null @@ -1,114 +0,0 @@ -package chunk - -import ( - "math/rand" - "sync" - "testing" - - "github.com/bradfitz/gomemcache/memcache" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/require" - "github.com/weaveworks/cortex/pkg/prom1/storage/local/chunk" - "golang.org/x/net/context" -) - -type mockMemcache struct { - sync.RWMutex - contents map[string][]byte -} - -func newMockMemcache() *mockMemcache { - return &mockMemcache{ - contents: map[string][]byte{}, - } -} - -func (m *mockMemcache) GetMulti(keys []string) (map[string]*memcache.Item, error) { - m.RLock() - defer m.RUnlock() - result := map[string]*memcache.Item{} - for _, k := range keys { - if c, ok := m.contents[k]; ok { - result[k] = &memcache.Item{ - Value: c, - } - } - } - return result, nil -} - -func (m *mockMemcache) Set(item *memcache.Item) error { - m.Lock() - defer m.Unlock() - m.contents[item.Key] = item.Value - return nil -} - -func TestChunkCache(t *testing.T) { - c := Cache{ - memcache: newMockMemcache(), - } - - const ( - chunkLen = 13 * 3600 // in seconds - ) - - // put 100 chunks from 0 to 99 - keys := []string{} - chunks := []Chunk{} - for i := 0; i < 100; i++ { - ts := model.TimeFromUnix(int64(i * chunkLen)) - promChunk, _ := chunk.New().Add(model.SamplePair{ - Timestamp: ts, - Value: model.SampleValue(i), - }) - chunk := NewChunk( - userID, - model.Fingerprint(1), - model.Metric{ - model.MetricNameLabel: "foo", - "bar": "baz", - }, - promChunk[0], - ts, - ts.Add(chunkLen), - ) - - buf, err := chunk.Encode() - require.NoError(t, err) - - key := chunk.ExternalKey() - err = c.StoreChunk(context.Background(), key, buf) - require.NoError(t, err) - - keys = append(keys, key) - chunks = append(chunks, chunk) - } - - for i := 0; i < 100; i++ { - index := rand.Intn(len(keys)) - key := keys[index] - - chunk, err := parseExternalKey(userID, key) - require.NoError(t, err) - - found, missing, err := c.FetchChunkData(context.Background(), []Chunk{chunk}) - require.NoError(t, err) - require.Empty(t, missing) - require.Len(t, found, 1) - require.Equal(t, chunks[index], found[0]) - } - - // test getting them all - receivedChunks := []Chunk{} - for i := 0; i < len(keys); i++ { - chunk, err := parseExternalKey(userID, keys[i]) - require.NoError(t, err) - receivedChunks = append(receivedChunks, chunk) - } - found, missing, err := c.FetchChunkData(context.Background(), receivedChunks) - require.NoError(t, err) - require.Empty(t, missing) - require.Len(t, found, len(keys)) - require.Equal(t, chunks, receivedChunks) -} diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index ac421f86309..b57b53caff0 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/chunk/cache" "github.com/weaveworks/cortex/pkg/util" ) @@ -33,16 +34,22 @@ var ( }, HashBuckets: 1024, }) + cacheCorrupt = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "cache_corrupt_chunks_total", + Help: "Total count of corrupt chunks found in cache.", + }) ) func init() { prometheus.MustRegister(indexEntriesPerChunk) prometheus.MustRegister(rowWrites) + prometheus.MustRegister(cacheCorrupt) } // StoreConfig specifies config for a ChunkStore type StoreConfig struct { - CacheConfig + CacheConfig cache.Config // For injecting different schemas in tests. schemaFactory func(cfg SchemaConfig) Schema @@ -58,7 +65,7 @@ type Store struct { cfg StoreConfig storage StorageClient - cache *Cache + cache cache.Cache schema Schema } @@ -75,11 +82,16 @@ func NewStore(cfg StoreConfig, schemaCfg SchemaConfig, storage StorageClient) (* return nil, err } + cache, err := cache.New(cfg.CacheConfig) + if err != nil { + return nil, err + } + return &Store{ cfg: cfg, storage: storage, schema: schema, - cache: NewCache(cfg.CacheConfig), + cache: cache, }, nil } @@ -196,15 +208,22 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim // Filter out chunks that are not in the selected time range. filtered := make([]Chunk, 0, len(chunks)) + keys := make([]string, 0, len(chunks)) for _, chunk := range chunks { if chunk.Through < from || through < chunk.From { continue } filtered = append(filtered, chunk) + keys = append(keys, chunk.ExternalKey()) } // Now fetch the actual chunk data from Memcache / S3 - fromCache, missing, err := c.cache.FetchChunkData(ctx, filtered) + cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) + if err != nil { + level.Warn(logger).Log("msg", "error fetching from cache", "err", err) + } + + fromCache, missing, err := ProcessCacheResponse(filtered, cacheHits, cacheBufs) if err != nil { level.Warn(logger).Log("msg", "error fetching from cache", "err", err) } @@ -240,6 +259,41 @@ outer: return filteredChunks, nil } +// ProcessCacheResponse decodes the chunks coming back from the cache, separating +// hits and misses. +func ProcessCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) (found []Chunk, missing []Chunk, err error) { + decodeContext := NewDecodeContext() + + i, j := 0, 0 + for i < len(chunks) && j < len(keys) { + chunkKey := chunks[i].ExternalKey() + + if chunkKey < keys[j] { + missing = append(missing, chunks[i]) + i++ + } else if chunkKey > keys[j] { + level.Debug(util.Logger).Log("msg", "got chunk from cache we didn't ask for") + j++ + } else { + chunk := chunks[i] + err = chunk.Decode(decodeContext, bufs[j]) + if err != nil { + cacheCorrupt.Inc() + return + } + found = append(found, chunk) + i++ + j++ + } + } + + for ; i < len(chunks); i++ { + missing = append(missing, chunks[i]) + } + + return +} + func (c *Store) getSeriesMatrix(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricNameMatcher *labels.Matcher) (model.Matrix, error) { // Get all series from the index userID, err := user.ExtractOrgID(ctx) @@ -466,13 +520,15 @@ func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []Index return unique(chunkSet), nil } -func (c *Store) writeBackCache(_ context.Context, chunks []Chunk) error { +func (c *Store) writeBackCache(ctx context.Context, chunks []Chunk) error { for i := range chunks { encoded, err := chunks[i].Encode() if err != nil { return err } - c.cache.BackgroundWrite(chunks[i].ExternalKey(), encoded) + if err := c.cache.StoreChunk(ctx, chunks[i].ExternalKey(), encoded); err != nil { + return err + } } return nil }