From 5b76d192a99a3e37cf83403695a832232d33a637 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 21 Aug 2018 13:53:43 +0530 Subject: [PATCH 1/8] Expose fifoCache Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/fifo_cache.go | 15 +++++++++------ pkg/chunk/fifo_cache_test.go | 18 +++++++++--------- pkg/chunk/series_store.go | 8 ++++---- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pkg/chunk/fifo_cache.go b/pkg/chunk/fifo_cache.go index 3a0bb8526f5..93d9cd9d485 100644 --- a/pkg/chunk/fifo_cache.go +++ b/pkg/chunk/fifo_cache.go @@ -5,9 +5,9 @@ import ( "time" ) -// fifoCache is a simple string -> interface{} cache which uses a fifo slide to +// FifoCache is a simple string -> interface{} cache which uses a fifo slide to // manage evictions. O(1) inserts and updates, O(1) gets. -type fifoCache struct { +type FifoCache struct { lock sync.RWMutex size int entries []cacheEntry @@ -24,15 +24,17 @@ type cacheEntry struct { prev, next int } -func newFifoCache(size int) *fifoCache { - return &fifoCache{ +// NewFifoCache returns a new initialised FifoCache of size. +func NewFifoCache(size int) *FifoCache { + return &FifoCache{ size: size, entries: make([]cacheEntry, 0, size), index: make(map[string]int, size), } } -func (c *fifoCache) put(key string, value interface{}) { +// Put stores the value against the key. +func (c *FifoCache) Put(key string, value interface{}) { if c.size == 0 { return } @@ -95,7 +97,8 @@ func (c *fifoCache) put(key string, value interface{}) { c.index[key] = index } -func (c *fifoCache) get(key string) (value interface{}, updated time.Time, ok bool) { +// Get returns the stored value against the key and when the key was last updated. +func (c *FifoCache) Get(key string) (value interface{}, updated time.Time, ok bool) { if c.size == 0 { return } diff --git a/pkg/chunk/fifo_cache_test.go b/pkg/chunk/fifo_cache_test.go index 1beae800a5d..62625d7397a 100644 --- a/pkg/chunk/fifo_cache_test.go +++ b/pkg/chunk/fifo_cache_test.go @@ -12,56 +12,56 @@ const size = 10 const overwrite = 5 func TestFifoCache(t *testing.T) { - c := newFifoCache(size) + c := NewFifoCache(size) // Check put / get works for i := 0; i < size; i++ { - c.put(strconv.Itoa(i), i) + c.Put(strconv.Itoa(i), i) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := 0; i < size; i++ { - value, _, ok := c.get(strconv.Itoa(i)) + value, _, ok := c.Get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } // Check evictions for i := size; i < size+overwrite; i++ { - c.put(strconv.Itoa(i), i) + c.Put(strconv.Itoa(i), i) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := 0; i < size-overwrite; i++ { - _, _, ok := c.get(strconv.Itoa(i)) + _, _, ok := c.Get(strconv.Itoa(i)) require.False(t, ok) } for i := size; i < size+overwrite; i++ { - value, _, ok := c.get(strconv.Itoa(i)) + value, _, ok := c.Get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } // Check updates work for i := size; i < size+overwrite; i++ { - c.put(strconv.Itoa(i), i*2) + c.Put(strconv.Itoa(i), i*2) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := size; i < size+overwrite; i++ { - value, _, ok := c.get(strconv.Itoa(i)) + value, _, ok := c.Get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i*2, value.(int)) } } -func (c *fifoCache) print() { +func (c *FifoCache) print() { fmt.Println("first", c.first, "last", c.last) for i, entry := range c.entries { fmt.Printf(" %d -> key: %s, value: %v, next: %d, prev: %d\n", i, entry.key, entry.value, entry.next, entry.prev) diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 3f007844330..a1280846c38 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -22,7 +22,7 @@ var ( // seriesStore implements Store type seriesStore struct { store - cardinalityCache *fifoCache + cardinalityCache *FifoCache } func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { @@ -38,7 +38,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor schema: schema, chunkFetcher: fetcher, }, - cardinalityCache: newFifoCache(cfg.CardinalityCacheSize), + cardinalityCache: NewFifoCache(cfg.CardinalityCacheSize), }, nil } @@ -182,7 +182,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, level.Debug(log).Log("queries", len(queries)) for _, query := range queries { - value, updated, ok := c.cardinalityCache.get(query.HashValue) + value, updated, ok := c.cardinalityCache.Get(query.HashValue) if !ok { continue } @@ -201,7 +201,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, // TODO This is not correct, will overcount for queries > 24hrs for _, query := range queries { - c.cardinalityCache.put(query.HashValue, len(entries)) + c.cardinalityCache.Put(query.HashValue, len(entries)) } if len(entries) > c.cfg.CardinalityLimit { return nil, errCardinalityExceeded From c5ddc33e6439aa22494f6a0d1045d273852ce323 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 21 Aug 2018 14:18:59 +0530 Subject: [PATCH 2/8] Implement caching of queries Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/storage/caching_storage_client.go | 63 +++++++++++++++++++++ pkg/chunk/storage/factory.go | 22 +++++-- 2 files changed, 79 insertions(+), 6 deletions(-) create mode 100644 pkg/chunk/storage/caching_storage_client.go diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go new file mode 100644 index 00000000000..bb7f9d57311 --- /dev/null +++ b/pkg/chunk/storage/caching_storage_client.go @@ -0,0 +1,63 @@ +package storage + +import ( + "context" + "time" + + "github.com/weaveworks/cortex/pkg/chunk" +) + +type cachingStorageClient struct { + chunk.StorageClient + cache *chunk.FifoCache + validity time.Duration +} + +func newCachingStorageClient(client chunk.StorageClient, size int, validity time.Duration) chunk.StorageClient { + if size == 0 { + return client + } + + return &cachingStorageClient{ + StorageClient: client, + cache: chunk.NewFifoCache(size), + validity: validity, + } +} + +func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { + value, updated, ok := s.cache.Get(queryKey(query)) + if ok && time.Now().Sub(updated) < s.validity { + batches := value.([]chunk.ReadBatch) + for _, batch := range batches { + callback(batch) + } + + return nil + } + + readBatches := []chunk.ReadBatch{} + err := s.StorageClient.QueryPages(ctx, query, copyingCallback(&readBatches, callback)) + if err != nil { + return err + } + + s.cache.Put(queryKey(query), readBatches) + return nil +} + +func copyingCallback(readBatches *[]chunk.ReadBatch, cb func(chunk.ReadBatch) bool) func(chunk.ReadBatch) bool { + return func(result chunk.ReadBatch) bool { + *readBatches = append(*readBatches, result) + return cb(result) + } +} + +func queryKey(q chunk.IndexQuery) string { + const sep = "\xff" + return q.TableName + sep + + q.HashValue + sep + + string(q.RangeValuePrefix) + sep + + string(q.RangeValueStart) + sep + + string(q.ValueEqual) +} diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index b48b0d2f502..c40f28bccfa 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -5,6 +5,7 @@ import ( "flag" "fmt" "strings" + "time" "github.com/go-kit/kit/log/level" "github.com/weaveworks/cortex/pkg/chunk" @@ -20,6 +21,9 @@ type Config struct { AWSStorageConfig aws.StorageConfig GCPStorageConfig gcp.Config CassandraStorageConfig cassandra.Config + + IndexCacheSize int + IndexCacheValidity time.Duration } // RegisterFlags adds the flags required to configure this flag set. @@ -28,26 +32,32 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.AWSStorageConfig.RegisterFlags(f) cfg.GCPStorageConfig.RegisterFlags(f) cfg.CassandraStorageConfig.RegisterFlags(f) + + f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable.") + f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 15*time.Minute, "Period for which entries in the index cache are valid.") } // NewStorageClient makes a storage client based on the configuration. -func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (chunk.StorageClient, error) { +func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (client chunk.StorageClient, err error) { switch cfg.StorageClient { case "inmemory": - return chunk.NewMockStorage(), nil + client, err = chunk.NewMockStorage(), nil case "aws": path := strings.TrimPrefix(cfg.AWSStorageConfig.DynamoDB.URL.Path, "/") if len(path) > 0 { level.Warn(util.Logger).Log("msg", "ignoring DynamoDB URL path", "path", path) } - return aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg) + client, err = aws.NewStorageClient(cfg.AWSStorageConfig, schemaCfg) case "gcp": - return gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) + client, err = gcp.NewStorageClient(context.Background(), cfg.GCPStorageConfig, schemaCfg) case "cassandra": - return cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) + client, err = cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) default: - return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, gcp, cassandra, inmemory", cfg.StorageClient) + client, err = nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, gcp, cassandra, inmemory", cfg.StorageClient) } + + client = newCachingStorageClient(client, cfg.IndexCacheSize, cfg.IndexCacheValidity) + return } // NewTableClient makes a new table client based on the configuration. From fab87dad8978ec7eb13b4d747f82b71ad0314f9a Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 21 Aug 2018 15:48:47 +0530 Subject: [PATCH 3/8] Instrument FifoCache and add expiry * Instrument the FifoCache * Move expiry checks into the cache itself Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/fifo_cache.go | 104 +++++++++++++++++--- pkg/chunk/fifo_cache_test.go | 26 ++++- pkg/chunk/series_store.go | 8 +- pkg/chunk/storage/caching_storage_client.go | 82 ++++++++++++--- 4 files changed, 179 insertions(+), 41 deletions(-) diff --git a/pkg/chunk/fifo_cache.go b/pkg/chunk/fifo_cache.go index 93d9cd9d485..70b68f804e0 100644 --- a/pkg/chunk/fifo_cache.go +++ b/pkg/chunk/fifo_cache.go @@ -3,18 +3,73 @@ package chunk import ( "sync" "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + cacheEntriesAdded = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "added_total", + Help: "The total number of Put calls on the cache", + }, []string{"cache"}) + + cacheEntriesAddedNew = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "added_new_total", + Help: "The total number of new entries added to the cache", + }, []string{"cache"}) + + cacheEntriesEvicted = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "evicted_total", + Help: "The total number of evicted entries", + }, []string{"cache"}) + + cacheTotalGets = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "gets_total", + Help: "The total number of Get calls", + }, []string{"cache"}) + + cacheTotalMisses = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "misses_total", + Help: "The total number of Get calls that had no valid entry", + }, []string{"cache"}) + + cacheStaleGets = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "querier", + Subsystem: "cache", + Name: "stale_gets_total", + Help: "The total number of Get calls that had an entry which expired", + }, []string{"cache"}) ) // FifoCache is a simple string -> interface{} cache which uses a fifo slide to // manage evictions. O(1) inserts and updates, O(1) gets. type FifoCache struct { - lock sync.RWMutex - size int - entries []cacheEntry - index map[string]int + lock sync.RWMutex + size int + validity time.Duration + entries []cacheEntry + index map[string]int // indexes into entries to identify the most recent and least recent entry. first, last int + + entriesAdded prometheus.Counter + entriesAddedNew prometheus.Counter + entriesEvicted prometheus.Counter + totalGets prometheus.Counter + totalMisses prometheus.Counter + staleGets prometheus.Counter } type cacheEntry struct { @@ -25,16 +80,25 @@ type cacheEntry struct { } // NewFifoCache returns a new initialised FifoCache of size. -func NewFifoCache(size int) *FifoCache { +func NewFifoCache(name string, size int, validity time.Duration) *FifoCache { return &FifoCache{ - size: size, - entries: make([]cacheEntry, 0, size), - index: make(map[string]int, size), + size: size, + validity: validity, + entries: make([]cacheEntry, 0, size), + index: make(map[string]int, size), + + entriesAdded: cacheEntriesAdded.WithLabelValues(name), + entriesAddedNew: cacheEntriesAddedNew.WithLabelValues(name), + entriesEvicted: cacheEntriesEvicted.WithLabelValues(name), + totalGets: cacheTotalGets.WithLabelValues(name), + totalMisses: cacheTotalMisses.WithLabelValues(name), + staleGets: cacheStaleGets.WithLabelValues(name), } } // Put stores the value against the key. func (c *FifoCache) Put(key string, value interface{}) { + c.entriesAdded.Inc() if c.size == 0 { return } @@ -64,9 +128,11 @@ func (c *FifoCache) Put(key string, value interface{}) { c.entries[index] = entry return } + c.entriesAddedNew.Inc() // Otherwise, see if we need to evict an entry. if len(c.entries) >= c.size { + c.entriesEvicted.Inc() index = c.last entry := c.entries[index] @@ -98,19 +164,27 @@ func (c *FifoCache) Put(key string, value interface{}) { } // Get returns the stored value against the key and when the key was last updated. -func (c *FifoCache) Get(key string) (value interface{}, updated time.Time, ok bool) { +func (c *FifoCache) Get(key string) (interface{}, bool) { + c.totalGets.Inc() if c.size == 0 { - return + return nil, false } c.lock.RLock() defer c.lock.RUnlock() - var index int - index, ok = c.index[key] + index, ok := c.index[key] if ok { - value = c.entries[index].value - updated = c.entries[index].updated + updated := c.entries[index].updated + if time.Now().Sub(updated) < c.validity { + return c.entries[index].value, true + } + + c.totalMisses.Inc() + c.staleGets.Inc() + return nil, false } - return + + c.totalMisses.Inc() + return nil, false } diff --git a/pkg/chunk/fifo_cache_test.go b/pkg/chunk/fifo_cache_test.go index 62625d7397a..de44754beb6 100644 --- a/pkg/chunk/fifo_cache_test.go +++ b/pkg/chunk/fifo_cache_test.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -12,7 +13,7 @@ const size = 10 const overwrite = 5 func TestFifoCache(t *testing.T) { - c := NewFifoCache(size) + c := NewFifoCache("test", size, 1*time.Minute) // Check put / get works for i := 0; i < size; i++ { @@ -23,7 +24,7 @@ func TestFifoCache(t *testing.T) { require.Len(t, c.entries, size) for i := 0; i < size; i++ { - value, _, ok := c.Get(strconv.Itoa(i)) + value, ok := c.Get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } @@ -37,11 +38,11 @@ func TestFifoCache(t *testing.T) { require.Len(t, c.entries, size) for i := 0; i < size-overwrite; i++ { - _, _, ok := c.Get(strconv.Itoa(i)) + _, ok := c.Get(strconv.Itoa(i)) require.False(t, ok) } for i := size; i < size+overwrite; i++ { - value, _, ok := c.Get(strconv.Itoa(i)) + value, ok := c.Get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } @@ -55,12 +56,27 @@ func TestFifoCache(t *testing.T) { require.Len(t, c.entries, size) for i := size; i < size+overwrite; i++ { - value, _, ok := c.Get(strconv.Itoa(i)) + value, ok := c.Get(strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i*2, value.(int)) } } +func TestFifoCacheExpiry(t *testing.T) { + c := NewFifoCache("test", size, 5*time.Millisecond) + + c.Put("0", 0) + + value, ok := c.Get("0") + require.True(t, ok) + require.Equal(t, 0, value.(int)) + + // Expire the entry. + time.Sleep(5 * time.Millisecond) + _, ok = c.Get(strconv.Itoa(0)) + require.False(t, ok) +} + func (c *FifoCache) print() { fmt.Println("first", c.first, "last", c.last) for i, entry := range c.entries { diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index a1280846c38..54e91a9e055 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "time" "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" @@ -38,7 +37,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor schema: schema, chunkFetcher: fetcher, }, - cardinalityCache: NewFifoCache(cfg.CardinalityCacheSize), + cardinalityCache: NewFifoCache("cardinality", cfg.CardinalityCacheSize, cfg.CardinalityCacheValidity), }, nil } @@ -182,13 +181,12 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, level.Debug(log).Log("queries", len(queries)) for _, query := range queries { - value, updated, ok := c.cardinalityCache.Get(query.HashValue) + value, ok := c.cardinalityCache.Get(query.HashValue) if !ok { continue } - entryAge := time.Now().Sub(updated) cardinality := value.(int) - if entryAge < c.cfg.CardinalityCacheValidity && cardinality > c.cfg.CardinalityLimit { + if cardinality > c.cfg.CardinalityLimit { return nil, errCardinalityExceeded } } diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index bb7f9d57311..07dcceae950 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -1,7 +1,9 @@ package storage import ( + "bytes" "context" + "strings" "time" "github.com/weaveworks/cortex/pkg/chunk" @@ -20,44 +22,92 @@ func newCachingStorageClient(client chunk.StorageClient, size int, validity time return &cachingStorageClient{ StorageClient: client, - cache: chunk.NewFifoCache(size), + cache: chunk.NewFifoCache("index", size, validity), validity: validity, } } func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { - value, updated, ok := s.cache.Get(queryKey(query)) - if ok && time.Now().Sub(updated) < s.validity { + value, ok := s.cache.Get(queryKey(query)) + if ok { batches := value.([]chunk.ReadBatch) - for _, batch := range batches { - callback(batch) - } + filteredBatch := filterBatchByQuery(query, batches) + callback(filteredBatch) return nil } - readBatches := []chunk.ReadBatch{} - err := s.StorageClient.QueryPages(ctx, query, copyingCallback(&readBatches, callback)) + batches := []chunk.ReadBatch{} + cacheableQuery := chunk.IndexQuery{ + TableName: query.TableName, + HashValue: query.HashValue, + } // Just reads the entire row and caches it. + + err := s.StorageClient.QueryPages(ctx, cacheableQuery, copyingCallback(&batches)) if err != nil { return err } - s.cache.Put(queryKey(query), readBatches) + filteredBatch := filterBatchByQuery(query, batches) + callback(filteredBatch) + + s.cache.Put(queryKey(query), batches) + return nil } -func copyingCallback(readBatches *[]chunk.ReadBatch, cb func(chunk.ReadBatch) bool) func(chunk.ReadBatch) bool { +type readBatch []cell + +func (b readBatch) Len() int { return len(b) } +func (b readBatch) RangeValue(i int) []byte { return b[i].column } +func (b readBatch) Value(i int) []byte { return b[i].value } + +type cell struct { + column []byte + value []byte +} + +func copyingCallback(readBatches *[]chunk.ReadBatch) func(chunk.ReadBatch) bool { return func(result chunk.ReadBatch) bool { *readBatches = append(*readBatches, result) - return cb(result) + return true } } func queryKey(q chunk.IndexQuery) string { const sep = "\xff" - return q.TableName + sep + - q.HashValue + sep + - string(q.RangeValuePrefix) + sep + - string(q.RangeValueStart) + sep + - string(q.ValueEqual) + return q.TableName + sep + q.HashValue +} + +func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) readBatch { + var filter func([]byte, []byte) bool + + if len(query.RangeValuePrefix) != 0 { + filter = func(rangeValue []byte, value []byte) bool { + return strings.HasPrefix(string(rangeValue), string(query.RangeValuePrefix)) + } + } + if len(query.RangeValueStart) != 0 { + filter = func(rangeValue []byte, value []byte) bool { + return string(rangeValue) >= string(query.RangeValueStart) + } + } + if len(query.ValueEqual) != 0 { + // This is on top of the existing filters. + existingFilter := filter + filter = func(rangeValue []byte, value []byte) bool { + return existingFilter(rangeValue, value) && bytes.Equal(value, query.ValueEqual) + } + } + + finalBatch := make(readBatch, 0, len(batches)) // On the higher side for most queries. On the lower side for column key schema. + for _, batch := range batches { + for i := 0; i < batch.Len(); i++ { + if filter(batch.RangeValue(i), batch.Value(i)) { + finalBatch = append(finalBatch, cell{column: batch.RangeValue(i), value: batch.Value(i)}) + } + } + } + + return finalBatch } From 4012bed05fef306da0f5b11927828206907609de Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 21 Aug 2018 23:01:22 +0530 Subject: [PATCH 4/8] Add tests for caching store Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/storage/caching_fixtures.go | 27 +++++++++++ pkg/chunk/storage/caching_storage_client.go | 2 +- pkg/chunk/storage/index_test.go | 53 ++++++++++++--------- pkg/chunk/storage/utils_test.go | 1 + 4 files changed, 60 insertions(+), 23 deletions(-) create mode 100644 pkg/chunk/storage/caching_fixtures.go diff --git a/pkg/chunk/storage/caching_fixtures.go b/pkg/chunk/storage/caching_fixtures.go new file mode 100644 index 00000000000..35b6bb0c469 --- /dev/null +++ b/pkg/chunk/storage/caching_fixtures.go @@ -0,0 +1,27 @@ +package storage + +import ( + "time" + + "github.com/weaveworks/cortex/pkg/chunk/gcp" + + "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/testutils" +) + +type fixture struct { + fixture testutils.Fixture +} + +func (f fixture) Name() string { return "caching-store" } +func (f fixture) Clients() (chunk.StorageClient, chunk.TableClient, chunk.SchemaConfig, error) { + storageClient, tableClient, schemaConfig, err := f.fixture.Clients() + client := newCachingStorageClient(storageClient, 500, 5*time.Minute) + return client, tableClient, schemaConfig, err +} +func (f fixture) Teardown() error { return f.fixture.Teardown() } + +// Fixtures for unit testing the caching storage. +var Fixtures = []testutils.Fixture{ + fixture{gcp.Fixtures[0]}, +} diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 07dcceae950..5b1f71a8a00 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -80,7 +80,7 @@ func queryKey(q chunk.IndexQuery) string { } func filterBatchByQuery(query chunk.IndexQuery, batches []chunk.ReadBatch) readBatch { - var filter func([]byte, []byte) bool + filter := func([]byte, []byte) bool { return true } if len(query.RangeValuePrefix) != 0 { filter = func(rangeValue []byte, value []byte) bool { diff --git a/pkg/chunk/storage/index_test.go b/pkg/chunk/storage/index_test.go index b6cdc0f172c..91d8532bb96 100644 --- a/pkg/chunk/storage/index_test.go +++ b/pkg/chunk/storage/index_test.go @@ -104,10 +104,10 @@ func TestQueryPages(t *testing.T) { require.NoError(t, err) tests := []struct { - name string - query chunk.IndexQuery - provisionedErr int - want []chunk.IndexEntry + name string + query chunk.IndexQuery + repeat bool + want []chunk.IndexEntry }{ { "check HashValue only", @@ -115,7 +115,7 @@ func TestQueryPages(t *testing.T) { TableName: tableName, HashValue: "flip", }, - 0, + false, []chunk.IndexEntry{entries[5], entries[6], entries[7]}, }, { @@ -125,7 +125,7 @@ func TestQueryPages(t *testing.T) { HashValue: "foo", RangeValueStart: []byte("bar:2"), }, - 0, + false, []chunk.IndexEntry{entries[1], entries[2], entries[3], entries[4]}, }, { @@ -135,7 +135,7 @@ func TestQueryPages(t *testing.T) { HashValue: "foo", RangeValuePrefix: []byte("baz:"), }, - 0, + false, []chunk.IndexEntry{entries[3], entries[4]}, }, { @@ -146,7 +146,7 @@ func TestQueryPages(t *testing.T) { RangeValuePrefix: []byte("bar"), ValueEqual: []byte("20"), }, - 0, + false, []chunk.IndexEntry{entries[1]}, }, { @@ -157,27 +157,36 @@ func TestQueryPages(t *testing.T) { RangeValuePrefix: []byte("bar"), ValueEqual: []byte("20"), }, - 2, + true, []chunk.IndexEntry{entries[1]}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var have []chunk.IndexEntry - err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { - for i := 0; i < read.Len(); i++ { - have = append(have, chunk.IndexEntry{ - TableName: tt.query.TableName, - HashValue: tt.query.HashValue, - RangeValue: read.RangeValue(i), - Value: read.Value(i), - }) + run := true + for run { + var have []chunk.IndexEntry + err = client.QueryPages(context.Background(), tt.query, func(read chunk.ReadBatch) bool { + for i := 0; i < read.Len(); i++ { + have = append(have, chunk.IndexEntry{ + TableName: tt.query.TableName, + HashValue: tt.query.HashValue, + RangeValue: read.RangeValue(i), + Value: read.Value(i), + }) + } + return true + }) + require.NoError(t, err) + require.Equal(t, tt.want, have) + + if tt.repeat { + tt.repeat = false + } else { + run = false } - return true - }) - require.NoError(t, err) - require.Equal(t, tt.want, have) + } }) } }) diff --git a/pkg/chunk/storage/utils_test.go b/pkg/chunk/storage/utils_test.go index 0dda70ab42b..c1500f655c9 100644 --- a/pkg/chunk/storage/utils_test.go +++ b/pkg/chunk/storage/utils_test.go @@ -20,6 +20,7 @@ type storageClientTest func(*testing.T, chunk.StorageClient) func forAllFixtures(t *testing.T, storageClientTest storageClientTest) { fixtures := append(aws.Fixtures, gcp.Fixtures...) + fixtures = append(fixtures, Fixtures...) cassandraFixtures, err := cassandra.Fixtures() require.NoError(t, err) From 983031a32dd626a9d6402496f448584e53918527 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Thu, 23 Aug 2018 17:20:14 +0530 Subject: [PATCH 5/8] Add tracing for the fifo cache Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/fifo_cache.go | 18 ++++++++++++++-- pkg/chunk/fifo_cache_test.go | 23 ++++++++++++--------- pkg/chunk/series_store.go | 4 ++-- pkg/chunk/storage/caching_storage_client.go | 4 ++-- 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/pkg/chunk/fifo_cache.go b/pkg/chunk/fifo_cache.go index 70b68f804e0..9bfe9d7161c 100644 --- a/pkg/chunk/fifo_cache.go +++ b/pkg/chunk/fifo_cache.go @@ -1,9 +1,12 @@ package chunk import ( + "context" "sync" "time" + ot "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -64,6 +67,7 @@ type FifoCache struct { // indexes into entries to identify the most recent and least recent entry. first, last int + name string entriesAdded prometheus.Counter entriesAddedNew prometheus.Counter entriesEvicted prometheus.Counter @@ -87,6 +91,7 @@ func NewFifoCache(name string, size int, validity time.Duration) *FifoCache { entries: make([]cacheEntry, 0, size), index: make(map[string]int, size), + name: name, entriesAdded: cacheEntriesAdded.WithLabelValues(name), entriesAddedNew: cacheEntriesAddedNew.WithLabelValues(name), entriesEvicted: cacheEntriesEvicted.WithLabelValues(name), @@ -97,7 +102,10 @@ func NewFifoCache(name string, size int, validity time.Duration) *FifoCache { } // Put stores the value against the key. -func (c *FifoCache) Put(key string, value interface{}) { +func (c *FifoCache) Put(ctx context.Context, key string, value interface{}) { + span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-put") + defer span.Finish() + c.entriesAdded.Inc() if c.size == 0 { return @@ -164,7 +172,10 @@ func (c *FifoCache) Put(key string, value interface{}) { } // Get returns the stored value against the key and when the key was last updated. -func (c *FifoCache) Get(key string) (interface{}, bool) { +func (c *FifoCache) Get(ctx context.Context, key string) (interface{}, bool) { + span, ctx := ot.StartSpanFromContext(ctx, c.name+"-cache-get") + defer span.Finish() + c.totalGets.Inc() if c.size == 0 { return nil, false @@ -177,14 +188,17 @@ func (c *FifoCache) Get(key string) (interface{}, bool) { if ok { updated := c.entries[index].updated if time.Now().Sub(updated) < c.validity { + span.LogFields(otlog.Bool("hit", true)) return c.entries[index].value, true } c.totalMisses.Inc() c.staleGets.Inc() + span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", true)) return nil, false } + span.LogFields(otlog.Bool("hit", false), otlog.Bool("stale", false)) c.totalMisses.Inc() return nil, false } diff --git a/pkg/chunk/fifo_cache_test.go b/pkg/chunk/fifo_cache_test.go index de44754beb6..b058652bf59 100644 --- a/pkg/chunk/fifo_cache_test.go +++ b/pkg/chunk/fifo_cache_test.go @@ -1,6 +1,7 @@ package chunk import ( + "context" "fmt" "strconv" "testing" @@ -14,49 +15,50 @@ const overwrite = 5 func TestFifoCache(t *testing.T) { c := NewFifoCache("test", size, 1*time.Minute) + ctx := context.Background() // Check put / get works for i := 0; i < size; i++ { - c.Put(strconv.Itoa(i), i) + c.Put(ctx, strconv.Itoa(i), i) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := 0; i < size; i++ { - value, ok := c.Get(strconv.Itoa(i)) + value, ok := c.Get(ctx, strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } // Check evictions for i := size; i < size+overwrite; i++ { - c.Put(strconv.Itoa(i), i) + c.Put(ctx, strconv.Itoa(i), i) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := 0; i < size-overwrite; i++ { - _, ok := c.Get(strconv.Itoa(i)) + _, ok := c.Get(ctx, strconv.Itoa(i)) require.False(t, ok) } for i := size; i < size+overwrite; i++ { - value, ok := c.Get(strconv.Itoa(i)) + value, ok := c.Get(ctx, strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i, value.(int)) } // Check updates work for i := size; i < size+overwrite; i++ { - c.Put(strconv.Itoa(i), i*2) + c.Put(ctx, strconv.Itoa(i), i*2) //c.print() } require.Len(t, c.index, size) require.Len(t, c.entries, size) for i := size; i < size+overwrite; i++ { - value, ok := c.Get(strconv.Itoa(i)) + value, ok := c.Get(ctx, strconv.Itoa(i)) require.True(t, ok) require.Equal(t, i*2, value.(int)) } @@ -64,16 +66,17 @@ func TestFifoCache(t *testing.T) { func TestFifoCacheExpiry(t *testing.T) { c := NewFifoCache("test", size, 5*time.Millisecond) + ctx := context.Background() - c.Put("0", 0) + c.Put(ctx, "0", 0) - value, ok := c.Get("0") + value, ok := c.Get(ctx, "0") require.True(t, ok) require.Equal(t, 0, value.(int)) // Expire the entry. time.Sleep(5 * time.Millisecond) - _, ok = c.Get(strconv.Itoa(0)) + _, ok = c.Get(ctx, strconv.Itoa(0)) require.False(t, ok) } diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 54e91a9e055..a53cd4fb0bb 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -181,7 +181,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, level.Debug(log).Log("queries", len(queries)) for _, query := range queries { - value, ok := c.cardinalityCache.Get(query.HashValue) + value, ok := c.cardinalityCache.Get(ctx, query.HashValue) if !ok { continue } @@ -199,7 +199,7 @@ func (c *seriesStore) lookupSeriesByMetricNameMatcher(ctx context.Context, from, // TODO This is not correct, will overcount for queries > 24hrs for _, query := range queries { - c.cardinalityCache.Put(query.HashValue, len(entries)) + c.cardinalityCache.Put(ctx, query.HashValue, len(entries)) } if len(entries) > c.cfg.CardinalityLimit { return nil, errCardinalityExceeded diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 5b1f71a8a00..6f89028d6fa 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -28,7 +28,7 @@ func newCachingStorageClient(client chunk.StorageClient, size int, validity time } func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.IndexQuery, callback func(result chunk.ReadBatch) (shouldContinue bool)) error { - value, ok := s.cache.Get(queryKey(query)) + value, ok := s.cache.Get(ctx, queryKey(query)) if ok { batches := value.([]chunk.ReadBatch) filteredBatch := filterBatchByQuery(query, batches) @@ -51,7 +51,7 @@ func (s *cachingStorageClient) QueryPages(ctx context.Context, query chunk.Index filteredBatch := filterBatchByQuery(query, batches) callback(filteredBatch) - s.cache.Put(queryKey(query), batches) + s.cache.Put(ctx, queryKey(query), batches) return nil } From 0c87c55f687237659e56d6b81876cbe5a28b641a Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Thu, 23 Aug 2018 17:25:20 +0530 Subject: [PATCH 6/8] nit: Add that cache validity should be less than chunk idle time Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/storage/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index c40f28bccfa..e4c60f68c45 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -34,7 +34,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.CassandraStorageConfig.RegisterFlags(f) f.IntVar(&cfg.IndexCacheSize, "store.index-cache-size", 0, "Size of in-memory index cache, 0 to disable.") - f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 15*time.Minute, "Period for which entries in the index cache are valid.") + f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Period for which entries in the index cache are valid. Should be no higher than -ingester.max-chunk-idle.") } // NewStorageClient makes a storage client based on the configuration. From 870e2c96d3ffac94a7c65f21c91b4f144fedbf80 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Fri, 24 Aug 2018 21:56:34 +0530 Subject: [PATCH 7/8] review feedback Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/storage/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index e4c60f68c45..eddd7f2055d 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -53,7 +53,7 @@ func NewStorageClient(cfg Config, schemaCfg chunk.SchemaConfig) (client chunk.St case "cassandra": client, err = cassandra.NewStorageClient(cfg.CassandraStorageConfig, schemaCfg) default: - client, err = nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, gcp, cassandra, inmemory", cfg.StorageClient) + return nil, fmt.Errorf("Unrecognized storage client %v, choose one of: aws, gcp, cassandra, inmemory", cfg.StorageClient) } client = newCachingStorageClient(client, cfg.IndexCacheSize, cfg.IndexCacheValidity) From f6b0901e9e73ae1ad523c8cce03f680b05a6ff44 Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 28 Aug 2018 14:42:42 +0530 Subject: [PATCH 8/8] Move fifo_cache to pkg/chunk/cache Signed-off-by: Goutham Veeramachaneni --- pkg/chunk/{ => cache}/fifo_cache.go | 2 +- pkg/chunk/{ => cache}/fifo_cache_test.go | 2 +- pkg/chunk/series_store.go | 5 +++-- pkg/chunk/storage/caching_storage_client.go | 6 +++--- 4 files changed, 8 insertions(+), 7 deletions(-) rename pkg/chunk/{ => cache}/fifo_cache.go (99%) rename pkg/chunk/{ => cache}/fifo_cache_test.go (99%) diff --git a/pkg/chunk/fifo_cache.go b/pkg/chunk/cache/fifo_cache.go similarity index 99% rename from pkg/chunk/fifo_cache.go rename to pkg/chunk/cache/fifo_cache.go index 9bfe9d7161c..1eb5099ece8 100644 --- a/pkg/chunk/fifo_cache.go +++ b/pkg/chunk/cache/fifo_cache.go @@ -1,4 +1,4 @@ -package chunk +package cache import ( "context" diff --git a/pkg/chunk/fifo_cache_test.go b/pkg/chunk/cache/fifo_cache_test.go similarity index 99% rename from pkg/chunk/fifo_cache_test.go rename to pkg/chunk/cache/fifo_cache_test.go index b058652bf59..038103deebf 100644 --- a/pkg/chunk/fifo_cache_test.go +++ b/pkg/chunk/cache/fifo_cache_test.go @@ -1,4 +1,4 @@ -package chunk +package cache import ( "context" diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index a53cd4fb0bb..77e7fe14504 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" + "github.com/weaveworks/cortex/pkg/chunk/cache" "github.com/weaveworks/cortex/pkg/util" "github.com/weaveworks/cortex/pkg/util/extract" ) @@ -21,7 +22,7 @@ var ( // seriesStore implements Store type seriesStore struct { store - cardinalityCache *FifoCache + cardinalityCache *cache.FifoCache } func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Store, error) { @@ -37,7 +38,7 @@ func newSeriesStore(cfg StoreConfig, schema Schema, storage StorageClient) (Stor schema: schema, chunkFetcher: fetcher, }, - cardinalityCache: NewFifoCache("cardinality", cfg.CardinalityCacheSize, cfg.CardinalityCacheValidity), + cardinalityCache: cache.NewFifoCache("cardinality", cfg.CardinalityCacheSize, cfg.CardinalityCacheValidity), }, nil } diff --git a/pkg/chunk/storage/caching_storage_client.go b/pkg/chunk/storage/caching_storage_client.go index 6f89028d6fa..5b58078b2af 100644 --- a/pkg/chunk/storage/caching_storage_client.go +++ b/pkg/chunk/storage/caching_storage_client.go @@ -7,11 +7,12 @@ import ( "time" "github.com/weaveworks/cortex/pkg/chunk" + "github.com/weaveworks/cortex/pkg/chunk/cache" ) type cachingStorageClient struct { chunk.StorageClient - cache *chunk.FifoCache + cache *cache.FifoCache validity time.Duration } @@ -22,8 +23,7 @@ func newCachingStorageClient(client chunk.StorageClient, size int, validity time return &cachingStorageClient{ StorageClient: client, - cache: chunk.NewFifoCache("index", size, validity), - validity: validity, + cache: cache.NewFifoCache("index", size, validity), } }