From e42d765f4ac110b21d0487de7f8069c8e8a32aee Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 10 Jul 2018 10:20:23 +0100 Subject: [PATCH 1/6] Prevent OOM in the chunk store. - Merge and dedupe set of strings, not sets of parse chunks. - Limit number of chunks fetched in a single query. - Add lots more debug logging to the querier to help track this all down. Signed-off-by: Tom Wilkie --- pkg/chunk/by_key.go | 128 ---------------------- pkg/chunk/by_key_test.go | 131 ----------------------- pkg/chunk/chunk.go | 5 + pkg/chunk/chunk_store.go | 115 ++++++++++++-------- pkg/chunk/chunk_store_test.go | 18 +++- pkg/chunk/storage/by_key_test.go | 21 ++++ pkg/chunk/storage/storage_client_test.go | 4 +- pkg/chunk/strings.go | 59 ++++++++++ 8 files changed, 174 insertions(+), 307 deletions(-) delete mode 100644 pkg/chunk/by_key.go delete mode 100644 pkg/chunk/by_key_test.go create mode 100644 pkg/chunk/storage/by_key_test.go create mode 100644 pkg/chunk/strings.go diff --git a/pkg/chunk/by_key.go b/pkg/chunk/by_key.go deleted file mode 100644 index c4674c1738c..00000000000 --- a/pkg/chunk/by_key.go +++ /dev/null @@ -1,128 +0,0 @@ -package chunk - -// ByKey allow you to sort chunks by ID -type ByKey []Chunk - -func (cs ByKey) Len() int { return len(cs) } -func (cs ByKey) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } -func (cs ByKey) Less(i, j int) bool { return lessByKey(cs[i], cs[j]) } - -// This comparison uses all the same information as Chunk.ExternalKey() -func lessByKey(a, b Chunk) bool { - return a.UserID < b.UserID || - (a.UserID == b.UserID && (a.Fingerprint < b.Fingerprint || - (a.Fingerprint == b.Fingerprint && (a.From < b.From || - (a.From == b.From && (a.Through < b.Through || - (a.Through == b.Through && a.Checksum < b.Checksum))))))) -} - -func equalByKey(a, b Chunk) bool { - return a.UserID == b.UserID && a.Fingerprint == b.Fingerprint && - a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum -} - -// unique will remove duplicates from the input. -// list must be sorted. -func unique(cs ByKey) ByKey { - if len(cs) == 0 { - return ByKey{} - } - - result := make(ByKey, 1, len(cs)) - result[0] = cs[0] - i, j := 0, 1 - for j < len(cs) { - if equalByKey(result[i], cs[j]) { - j++ - continue - } - result = append(result, cs[j]) - i++ - j++ - } - return result -} - -// merge will merge & dedupe two lists of chunks. -// list musts be sorted and not contain dupes. -func merge(a, b ByKey) ByKey { - result := make(ByKey, 0, len(a)+len(b)) - i, j := 0, 0 - for i < len(a) && j < len(b) { - if lessByKey(a[i], b[j]) { - result = append(result, a[i]) - i++ - } else if lessByKey(b[j], a[i]) { - result = append(result, b[j]) - j++ - } else { - result = append(result, a[i]) - i++ - j++ - } - } - for ; i < len(a); i++ { - result = append(result, a[i]) - } - for ; j < len(b); j++ { - result = append(result, b[j]) - } - return result -} - -// nWayUnion will merge and dedupe n lists of chunks. -// lists must be sorted and not contain dupes. -func nWayUnion(sets []ByKey) ByKey { - l := len(sets) - switch l { - case 0: - return ByKey{} - case 1: - return sets[0] - case 2: - return merge(sets[0], sets[1]) - default: - var ( - split = l / 2 - left = nWayUnion(sets[:split]) - right = nWayUnion(sets[split:]) - ) - return nWayUnion([]ByKey{left, right}) - } -} - -// nWayIntersect will interesct n sorted lists of chunks. -func nWayIntersect(sets []ByKey) ByKey { - l := len(sets) - switch l { - case 0: - return ByKey{} - case 1: - return sets[0] - case 2: - var ( - left, right = sets[0], sets[1] - i, j = 0, 0 - result = []Chunk{} - ) - for i < len(left) && j < len(right) { - if equalByKey(left[i], right[j]) { - result = append(result, left[i]) - } - - if lessByKey(left[i], right[j]) { - i++ - } else { - j++ - } - } - return result - default: - var ( - split = l / 2 - left = nWayIntersect(sets[:split]) - right = nWayIntersect(sets[split:]) - ) - return nWayIntersect([]ByKey{left, right}) - } -} diff --git a/pkg/chunk/by_key_test.go b/pkg/chunk/by_key_test.go deleted file mode 100644 index 3c63b0318d7..00000000000 --- a/pkg/chunk/by_key_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package chunk - -import ( - "math/rand" - "reflect" - "sort" - "testing" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" -) - -func c(id string) Chunk { - return Chunk{UserID: id} -} - -func TestUnique(t *testing.T) { - for _, tc := range []struct { - in ByKey - want ByKey - }{ - {nil, ByKey{}}, - {ByKey{c("a"), c("a")}, ByKey{c("a")}}, - {ByKey{c("a"), c("a"), c("b"), c("b"), c("c")}, ByKey{c("a"), c("b"), c("c")}}, - {ByKey{c("a"), c("b"), c("c")}, ByKey{c("a"), c("b"), c("c")}}, - } { - have := unique(tc.in) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func TestMerge(t *testing.T) { - type args struct { - a ByKey - b ByKey - } - for _, tc := range []struct { - args args - want ByKey - }{ - {args{ByKey{}, ByKey{}}, ByKey{}}, - {args{ByKey{c("a")}, ByKey{}}, ByKey{c("a")}}, - {args{ByKey{}, ByKey{c("b")}}, ByKey{c("b")}}, - {args{ByKey{c("a")}, ByKey{c("b")}}, ByKey{c("a"), c("b")}}, - { - args{ByKey{c("a"), c("c")}, ByKey{c("a"), c("b"), c("d")}}, - ByKey{c("a"), c("b"), c("c"), c("d")}, - }, - } { - have := merge(tc.args.a, tc.args.b) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func TestNWayUnion(t *testing.T) { - for _, tc := range []struct { - in []ByKey - want ByKey - }{ - {nil, ByKey{}}, - {[]ByKey{{c("a")}}, ByKey{c("a")}}, - {[]ByKey{{c("a")}, {c("a")}}, ByKey{c("a")}}, - {[]ByKey{{c("a")}, {}}, ByKey{c("a")}}, - {[]ByKey{{}, {c("b")}}, ByKey{c("b")}}, - {[]ByKey{{c("a")}, {c("b")}}, ByKey{c("a"), c("b")}}, - { - []ByKey{{c("a"), c("c"), c("e")}, {c("c"), c("d")}, {c("b")}}, - ByKey{c("a"), c("b"), c("c"), c("d"), c("e")}, - }, - { - []ByKey{{c("c"), c("d")}, {c("b")}, {c("a"), c("c"), c("e")}}, - ByKey{c("a"), c("b"), c("c"), c("d"), c("e")}, - }, - } { - have := nWayUnion(tc.in) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func TestNWayIntersect(t *testing.T) { - for _, tc := range []struct { - in []ByKey - want ByKey - }{ - {nil, ByKey{}}, - {[]ByKey{{c("a"), c("b"), c("c")}}, []Chunk{c("a"), c("b"), c("c")}}, - {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}}, ByKey{c("a"), c("c")}}, - {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}, {c("b")}}, ByKey{}}, - {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}, {c("a")}}, ByKey{c("a")}}, - } { - have := nWayIntersect(tc.in) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func BenchmarkByKeyLess(b *testing.B) { - now := model.Now() - a := ByKey{dummyChunk(now), dummyChunk(now)} - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - a.Less(0, 1) - } -} - -func BenchmarkByKeySort100(b *testing.B) { benchmarkByKeySort(b, 100) } -func BenchmarkByKeySort1000(b *testing.B) { benchmarkByKeySort(b, 1000) } -func BenchmarkByKeySort10000(b *testing.B) { benchmarkByKeySort(b, 10000) } - -func benchmarkByKeySort(b *testing.B, batchSize int) { - chunks := []Chunk{} - for i := 0; i < batchSize; i++ { - chunk := dummyChunk(model.Now() + model.Time(rand.Intn(batchSize))) - chunks = append(chunks, chunk) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - sort.Sort(ByKey(chunks)) - } -} diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 09bd6d0236c..87343017a8a 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -314,6 +314,11 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { return c.Data.UnmarshalFromBuf(remainingData[:int(dataLen)]) } +func equalByKey(a, b Chunk) bool { + return a.UserID == b.UserID && a.Fingerprint == b.Fingerprint && + a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum +} + func chunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) { sp, ctx := ot.StartSpanFromContext(ctx, "chunksToMatrix") defer sp.Finish() diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index c7482ec1d37..dfe37cd7f68 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -2,7 +2,6 @@ package chunk import ( "context" - "encoding/json" "flag" "fmt" "sort" @@ -53,7 +52,9 @@ func init() { // StoreConfig specifies config for a ChunkStore type StoreConfig struct { CacheConfig cache.Config - MinChunkAge time.Duration + + MinChunkAge time.Duration + QueryChunkLimit int // For injecting different schemas in tests. schemaFactory func(cfg SchemaConfig) Schema @@ -62,7 +63,8 @@ type StoreConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.CacheConfig.RegisterFlags(f) - f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "minimum time between chunk update and being saved to the store") + f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") + f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") } // Store implements Store @@ -168,6 +170,10 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch // Get implements ChunkStore func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) { + + logger := util.WithContext(ctx, util.Logger) + level.Debug(logger).Log("msg", "ChunkStore.Get", "from", from, "through", through, "matchers", len(allMatchers)) + sp, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") defer sp.Finish() @@ -215,12 +221,16 @@ func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Tim func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { logger := util.WithContext(ctx, util.Logger) + level.Debug(logger).Log("func", "ChunkStore.getMetricNameChunks", "from", from, "through", through, "matchers", len(allMatchers), "metricName", metricName) + filters, matchers := util.SplitFiltersAndMatchers(allMatchers) chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName) if err != nil { return nil, err } + level.Debug(logger).Log("func", "ChunkStore.getMetricNameChunks", "msg", "Chunks in index", "n", len(chunks)) + // Filter out chunks that are not in the selected time range. filtered := make([]Chunk, 0, len(chunks)) keys := make([]string, 0, len(chunks)) @@ -232,6 +242,11 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim keys = append(keys, chunk.ExternalKey()) } + level.Debug(logger).Log("func", "ChunkStore.getMetricNameChunks", "msg", "Chunks post filtering", "n", len(chunks)) + if len(filtered) > c.cfg.QueryChunkLimit { + return nil, fmt.Errorf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), c.cfg.QueryChunkLimit) + } + // Now fetch the actual chunk data from Memcache / S3 cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) if err != nil { @@ -254,10 +269,7 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim return nil, promql.ErrStorage(err) } - // TODO instead of doing this sort, propagate an index and assign chunks - // into the result based on that index. allChunks := append(fromCache, fromStorage...) - sort.Sort(ByKey(allChunks)) // Filter out chunks filteredChunks := make([]Chunk, 0, len(allChunks)) @@ -374,6 +386,8 @@ outer: } func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { + logger := util.WithContext(ctx, util.Logger) + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -385,17 +399,25 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode if err != nil { return nil, err } + level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "queries", len(queries)) entries, err := c.lookupEntriesByQueries(ctx, queries) if err != nil { return nil, err } + level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "entries", len(entries)) - return c.convertIndexEntriesToChunks(ctx, entries, nil) + chunkIDs, err := c.parseIndexEntries(ctx, entries, nil) + if err != nil { + return nil, err + } + level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "chunkIDs", len(chunkIDs)) + + return c.convertChunkIDsToChunks(ctx, chunkIDs) } // Otherwise get chunks which include other matchers - incomingChunkSets := make(chan ByKey) + incomingChunkIDs := make(chan []string) incomingErrors := make(chan error) for _, matcher := range matchers { go func(matcher *labels.Matcher) { @@ -411,6 +433,7 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode incomingErrors <- err return } + level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "matcher", matcher, "queries", len(queries)) // Lookup IndexEntry's entries, err := c.lookupEntriesByQueries(ctx, queries) @@ -418,31 +441,40 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode incomingErrors <- err return } + level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "matcher", matcher, "entries", len(entries)) - // Convert IndexEntry's into chunks - chunks, err := c.convertIndexEntriesToChunks(ctx, entries, matcher) + // Convert IndexEntry's to chunk IDs, filter out non-matchers at the same time. + chunkIDs, err := c.parseIndexEntries(ctx, entries, matcher) if err != nil { incomingErrors <- err - } else { - incomingChunkSets <- chunks + return } + level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "matcher", matcher, "chunkIDs", len(chunkIDs)) + incomingChunkIDs <- chunkIDs }(matcher) } // Receive chunkSets from all matchers - var chunkSets []ByKey + var chunkIDSets [][]string var lastErr error for i := 0; i < len(matchers); i++ { select { - case incoming := <-incomingChunkSets: - chunkSets = append(chunkSets, incoming) + case incoming := <-incomingChunkIDs: + chunkIDSets = append(chunkIDSets, incoming) case err := <-incomingErrors: lastErr = err } } + if lastErr != nil { + return nil, lastErr + } + + // Merge entries in order because we wish to keep label series together consecutively + chunkIDs := nWayIntersectStrings(chunkIDSets) + level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "msg", "post intersection", "entries", len(chunkIDs)) - // Merge chunkSets in order because we wish to keep label series together consecutively - return nWayIntersect(chunkSets), lastErr + // Convert IndexEntry's into chunks + return c.convertChunkIDsToChunks(ctx, chunkIDs) } func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { @@ -495,44 +527,43 @@ func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I return entries, nil } -func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) (ByKey, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, err - } - - var chunkSet ByKey +func (c *Store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { + result := make([]string, 0, len(entries)) for _, entry := range entries { - chunkKey, labelValue, metadataInIndex, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) if err != nil { return nil, err } - chunk, err := ParseExternalKey(userID, chunkKey) - if err != nil { - return nil, err + if matcher != nil && !matcher.Matches(string(labelValue)) { + level.Debug(util.WithContext(ctx, util.Logger)).Log("msg", "dropping chunk for non-matching label", "label", labelValue) + continue } + result = append(result, chunkKey) + } - // This can be removed in Dev 2017, 13 months after the last chunks - // was written with metadata in the index. - if metadataInIndex && entry.Value != nil { - if err := json.Unmarshal(entry.Value, &chunk); err != nil { - return nil, err - } - chunk.metadataInIndex = true - } + sort.Strings(result) + result = uniqueStrings(result) + return result, nil +} - if matcher != nil && !matcher.Matches(string(labelValue)) { - level.Debug(util.WithContext(ctx, util.Logger)).Log("msg", "dropping chunk for non-matching metric", "metric", chunk.Metric) - continue +func (c *Store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + chunkSet := make([]Chunk, 0, len(chunkIDs)) + for _, chunkID := range chunkIDs { + chunk, err := ParseExternalKey(userID, chunkID) + if err != nil { + return nil, err } chunkSet = append(chunkSet, chunk) } - // Return chunks sorted and deduped because they will be merged with other sets - sort.Sort(chunkSet) - return unique(chunkSet), nil + return chunkSet, nil } func (c *Store) writeBackCache(ctx context.Context, chunks []Chunk) error { diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 663b7329114..201a3187c9b 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -21,6 +21,12 @@ import ( "github.com/weaveworks/common/user" ) +func init() { + var al util.AllowedLevel + al.Set("debug") + util.InitLogger(al) +} + // newTestStore creates a new Store for testing. func newTestChunkStore(t *testing.T, cfg StoreConfig) *Store { storage := NewMockStorage() @@ -194,7 +200,8 @@ func TestChunkStore_Get(t *testing.T) { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, + schemaFactory: schema.fn, + QueryChunkLimit: 2e6, }) if err := store.Put(ctx, []Chunk{ @@ -333,7 +340,8 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, + schemaFactory: schema.fn, + QueryChunkLimit: 2e6, }) if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { @@ -378,7 +386,8 @@ func TestChunkStoreRandom(t *testing.T) { for i := range schemas { schemas[i].store = newTestChunkStore(t, StoreConfig{ - schemaFactory: schemas[i].fn, + schemaFactory: schemas[i].fn, + QueryChunkLimit: 2e6, }) } @@ -447,7 +456,8 @@ func TestChunkStoreLeastRead(t *testing.T) { // Test we don't read too much from the index ctx := user.InjectOrgID(context.Background(), userID) store := newTestChunkStore(t, StoreConfig{ - schemaFactory: v6Schema, + schemaFactory: v6Schema, + QueryChunkLimit: 2e6, }) // Put 24 chunks 1hr chunks in the store diff --git a/pkg/chunk/storage/by_key_test.go b/pkg/chunk/storage/by_key_test.go new file mode 100644 index 00000000000..f81ced792ac --- /dev/null +++ b/pkg/chunk/storage/by_key_test.go @@ -0,0 +1,21 @@ +package storage + +import ( + "github.com/weaveworks/cortex/pkg/chunk" +) + +// ByKey allow you to sort chunks by ID +type ByKey []chunk.Chunk + +func (cs ByKey) Len() int { return len(cs) } +func (cs ByKey) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } +func (cs ByKey) Less(i, j int) bool { return lessByKey(cs[i], cs[j]) } + +// This comparison uses all the same information as Chunk.ExternalKey() +func lessByKey(a, b chunk.Chunk) bool { + return a.UserID < b.UserID || + (a.UserID == b.UserID && (a.Fingerprint < b.Fingerprint || + (a.Fingerprint == b.Fingerprint && (a.From < b.From || + (a.From == b.From && (a.Through < b.Through || + (a.Through == b.Through && a.Checksum < b.Checksum))))))) +} diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index e3c5947daf2..c661a878721 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -49,8 +49,8 @@ func TestChunksBasic(t *testing.T) { require.NoError(t, err) require.Equal(t, len(chunksToGet), len(chunksWeGot)) - sort.Sort(chunk.ByKey(chunksToGet)) - sort.Sort(chunk.ByKey(chunksWeGot)) + sort.Sort(ByKey(chunksToGet)) + sort.Sort(ByKey(chunksWeGot)) for j := 0; j < len(chunksWeGot); j++ { require.Equal(t, chunksToGet[i].ExternalKey(), chunksWeGot[i].ExternalKey(), strconv.Itoa(i)) } diff --git a/pkg/chunk/strings.go b/pkg/chunk/strings.go new file mode 100644 index 00000000000..0d91e640d2d --- /dev/null +++ b/pkg/chunk/strings.go @@ -0,0 +1,59 @@ +package chunk + +func uniqueStrings(cs []string) []string { + if len(cs) == 0 { + return []string{} + } + + result := make([]string, 1, len(cs)) + result[0] = cs[0] + i, j := 0, 1 + for j < len(cs) { + if result[i] == cs[j] { + j++ + continue + } + result = append(result, cs[j]) + i++ + j++ + } + return result +} + +func intersectStrings(left, right []string) []string { + var ( + i, j = 0, 0 + result = []string{} + ) + for i < len(left) && j < len(right) { + if left[i] == right[j] { + result = append(result, left[i]) + } + + if left[i] < right[j] { + i++ + } else { + j++ + } + } + return result +} + +func nWayIntersectStrings(sets [][]string) []string { + l := len(sets) + switch l { + case 0: + return []string{} + case 1: + return sets[0] + case 2: + return intersectStrings(sets[0], sets[1]) + default: + var ( + split = l / 2 + left = nWayIntersectStrings(sets[:split]) + right = nWayIntersectStrings(sets[split:]) + ) + return intersectStrings(left, right) + } +} From aef2e91beafce5428bd806327e3df8ef79b384e4 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 10 Jul 2018 17:57:36 +0100 Subject: [PATCH 2/6] Review feedback: Use simple Less function for chunks in test. Signed-off-by: Tom Wilkie --- pkg/chunk/storage/by_key_test.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/pkg/chunk/storage/by_key_test.go b/pkg/chunk/storage/by_key_test.go index f81ced792ac..8d157b80956 100644 --- a/pkg/chunk/storage/by_key_test.go +++ b/pkg/chunk/storage/by_key_test.go @@ -9,13 +9,4 @@ type ByKey []chunk.Chunk func (cs ByKey) Len() int { return len(cs) } func (cs ByKey) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } -func (cs ByKey) Less(i, j int) bool { return lessByKey(cs[i], cs[j]) } - -// This comparison uses all the same information as Chunk.ExternalKey() -func lessByKey(a, b chunk.Chunk) bool { - return a.UserID < b.UserID || - (a.UserID == b.UserID && (a.Fingerprint < b.Fingerprint || - (a.Fingerprint == b.Fingerprint && (a.From < b.From || - (a.From == b.From && (a.Through < b.Through || - (a.Through == b.Through && a.Checksum < b.Checksum))))))) -} +func (cs ByKey) Less(i, j int) bool { return cs[i].ExternalKey() < cs[j].ExternalKey() } From 445d0064a5f2080fb0a1a85f334c3166a9df1d5e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 10 Jul 2018 17:58:33 +0100 Subject: [PATCH 3/6] Review feedback: Intersect chunkIDs as we receive them. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index dfe37cd7f68..7924ebe13d9 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -455,12 +455,16 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode } // Receive chunkSets from all matchers - var chunkIDSets [][]string + var chunkIDs []string var lastErr error for i := 0; i < len(matchers); i++ { select { case incoming := <-incomingChunkIDs: - chunkIDSets = append(chunkIDSets, incoming) + if chunkIDs == nil { + chunkIDs = incoming + } else { + chunkIDs = intersectStrings(chunkIDs, incoming) + } case err := <-incomingErrors: lastErr = err } @@ -469,9 +473,7 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode return nil, lastErr } - // Merge entries in order because we wish to keep label series together consecutively - chunkIDs := nWayIntersectStrings(chunkIDSets) - level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "msg", "post intersection", "entries", len(chunkIDs)) + level.Debug(log).Log("msg", "post intersection", "entries", len(chunkIDs)) // Convert IndexEntry's into chunks return c.convertChunkIDsToChunks(ctx, chunkIDs) @@ -543,6 +545,7 @@ func (c *Store) parseIndexEntries(ctx context.Context, entries []IndexEntry, mat result = append(result, chunkKey) } + // Return ids sorted and deduped because they will be merged with other sets. sort.Strings(result) result = uniqueStrings(result) return result, nil From 1557226f106ab4801f69342db5a9ab91e6c7aa78 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 10 Jul 2018 17:59:03 +0100 Subject: [PATCH 4/6] Review feedback: unify logging and tracing in the chunk store. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store.go | 74 ++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 7924ebe13d9..96bd87da552 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -7,6 +7,7 @@ import ( "sort" "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -168,24 +169,44 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch return writeReqs, nil } +type spanLogger struct { + log.Logger + ot.Span +} + +func newSpanLogger(ctx context.Context, method string) (*spanLogger, context.Context) { + span, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") + return &spanLogger{ + Logger: log.With(util.WithContext(ctx, util.Logger), "method", method), + Span: span, + }, ctx +} + +func (s *spanLogger) Log(kvps ...interface{}) error { + s.Logger.Log(kvps...) + fields, err := otlog.InterleavedKVToFields(kvps...) + if err != nil { + return err + } + s.Span.LogFields(fields...) + return nil +} + // Get implements ChunkStore func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.Get") + defer log.Span.Finish() - logger := util.WithContext(ctx, util.Logger) - level.Debug(logger).Log("msg", "ChunkStore.Get", "from", from, "through", through, "matchers", len(allMatchers)) - - sp, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") - defer sp.Finish() + now := model.Now() + level.Debug(log).Log("from", from, "through", through, "now", now, "matchers", len(allMatchers)) if through < from { return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from) } - now := model.Now() - sp.LogFields(otlog.String("from", from.String()), otlog.String("through", through.String()), otlog.String("now", now.String())) if from.After(now) { // time-span start is in future ... regard as legal - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) + level.Error(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) return nil, nil } @@ -196,14 +217,14 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers . if through.After(now.Add(5 * time.Minute)) { // time-span end is in future ... regard as legal - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now) + level.Error(log).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now) through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes } // Fetch metric name chunks if the matcher is of type equal, metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(allMatchers) if ok && metricNameMatcher.Type == labels.MatchEqual { - sp.SetTag("metric", metricNameMatcher.Value) + log.Span.SetTag("metric", metricNameMatcher.Value) return c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Value) } @@ -220,16 +241,15 @@ func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Tim } func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { - logger := util.WithContext(ctx, util.Logger) - level.Debug(logger).Log("func", "ChunkStore.getMetricNameChunks", "from", from, "through", through, "matchers", len(allMatchers), "metricName", metricName) + log, ctx := newSpanLogger(ctx, "ChunkStore.getMetricNameChunks") + level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "matchers", len(allMatchers)) filters, matchers := util.SplitFiltersAndMatchers(allMatchers) chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName) if err != nil { return nil, err } - - level.Debug(logger).Log("func", "ChunkStore.getMetricNameChunks", "msg", "Chunks in index", "n", len(chunks)) + level.Debug(log).Log("Chunks in index", len(chunks)) // Filter out chunks that are not in the selected time range. filtered := make([]Chunk, 0, len(chunks)) @@ -241,28 +261,30 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim filtered = append(filtered, chunk) keys = append(keys, chunk.ExternalKey()) } + level.Debug(log).Log("Chunks post filtering", len(chunks)) - level.Debug(logger).Log("func", "ChunkStore.getMetricNameChunks", "msg", "Chunks post filtering", "n", len(chunks)) if len(filtered) > c.cfg.QueryChunkLimit { - return nil, fmt.Errorf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), c.cfg.QueryChunkLimit) + err := fmt.Errorf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), c.cfg.QueryChunkLimit) + level.Error(log).Log("err", err) + return nil, err } // Now fetch the actual chunk data from Memcache / S3 cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) if err != nil { - level.Warn(logger).Log("msg", "error fetching from cache", "err", err) + level.Warn(log).Log("msg", "error fetching from cache", "err", err) } fromCache, missing, err := ProcessCacheResponse(filtered, cacheHits, cacheBufs) if err != nil { - level.Warn(logger).Log("msg", "error fetching from cache", "err", err) + level.Warn(log).Log("msg", "error fetching from cache", "err", err) } fromStorage, err := c.storage.GetChunks(ctx, missing) // Always cache any chunks we did get if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { - level.Warn(logger).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } if err != nil { @@ -386,7 +408,7 @@ outer: } func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { - logger := util.WithContext(ctx, util.Logger) + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") userID, err := user.ExtractOrgID(ctx) if err != nil { @@ -399,19 +421,19 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode if err != nil { return nil, err } - level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "queries", len(queries)) + level.Debug(log).Log("queries", len(queries)) entries, err := c.lookupEntriesByQueries(ctx, queries) if err != nil { return nil, err } - level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "entries", len(entries)) + level.Debug(log).Log("entries", len(entries)) chunkIDs, err := c.parseIndexEntries(ctx, entries, nil) if err != nil { return nil, err } - level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "chunkIDs", len(chunkIDs)) + level.Debug(log).Log("chunkIDs", len(chunkIDs)) return c.convertChunkIDsToChunks(ctx, chunkIDs) } @@ -433,7 +455,7 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode incomingErrors <- err return } - level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "matcher", matcher, "queries", len(queries)) + level.Debug(log).Log("matcher", matcher, "queries", len(queries)) // Lookup IndexEntry's entries, err := c.lookupEntriesByQueries(ctx, queries) @@ -441,7 +463,7 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode incomingErrors <- err return } - level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "matcher", matcher, "entries", len(entries)) + level.Debug(log).Log("matcher", matcher, "entries", len(entries)) // Convert IndexEntry's to chunk IDs, filter out non-matchers at the same time. chunkIDs, err := c.parseIndexEntries(ctx, entries, matcher) @@ -449,7 +471,7 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode incomingErrors <- err return } - level.Debug(logger).Log("func", "ChunkStore.lookupChunksByMetricName", "matcher", matcher, "chunkIDs", len(chunkIDs)) + level.Debug(log).Log("matcher", matcher, "chunkIDs", len(chunkIDs)) incomingChunkIDs <- chunkIDs }(matcher) } From 16e116740daca658c1928dc699b7adfbcc054d20 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 10 Jul 2018 18:00:25 +0100 Subject: [PATCH 5/6] Review feedback: remove debug logging from tests. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 201a3187c9b..94bb6733e67 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -21,12 +21,6 @@ import ( "github.com/weaveworks/common/user" ) -func init() { - var al util.AllowedLevel - al.Set("debug") - util.InitLogger(al) -} - // newTestStore creates a new Store for testing. func newTestChunkStore(t *testing.T, cfg StoreConfig) *Store { storage := NewMockStorage() From b207ccc5cccf19cad45e9f0af6428c3b06052688 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 12 Jul 2018 10:57:24 +0100 Subject: [PATCH 6/6] Review feedback: explain spanLogger. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index 96bd87da552..8c642c1420f 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -169,6 +169,7 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch return writeReqs, nil } +// spanLogger unifies tracing and logging, to reduce repetition. type spanLogger struct { log.Logger ot.Span