diff --git a/pkg/chunk/by_key.go b/pkg/chunk/by_key.go deleted file mode 100644 index c4674c1738c..00000000000 --- a/pkg/chunk/by_key.go +++ /dev/null @@ -1,128 +0,0 @@ -package chunk - -// ByKey allow you to sort chunks by ID -type ByKey []Chunk - -func (cs ByKey) Len() int { return len(cs) } -func (cs ByKey) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } -func (cs ByKey) Less(i, j int) bool { return lessByKey(cs[i], cs[j]) } - -// This comparison uses all the same information as Chunk.ExternalKey() -func lessByKey(a, b Chunk) bool { - return a.UserID < b.UserID || - (a.UserID == b.UserID && (a.Fingerprint < b.Fingerprint || - (a.Fingerprint == b.Fingerprint && (a.From < b.From || - (a.From == b.From && (a.Through < b.Through || - (a.Through == b.Through && a.Checksum < b.Checksum))))))) -} - -func equalByKey(a, b Chunk) bool { - return a.UserID == b.UserID && a.Fingerprint == b.Fingerprint && - a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum -} - -// unique will remove duplicates from the input. -// list must be sorted. -func unique(cs ByKey) ByKey { - if len(cs) == 0 { - return ByKey{} - } - - result := make(ByKey, 1, len(cs)) - result[0] = cs[0] - i, j := 0, 1 - for j < len(cs) { - if equalByKey(result[i], cs[j]) { - j++ - continue - } - result = append(result, cs[j]) - i++ - j++ - } - return result -} - -// merge will merge & dedupe two lists of chunks. -// list musts be sorted and not contain dupes. -func merge(a, b ByKey) ByKey { - result := make(ByKey, 0, len(a)+len(b)) - i, j := 0, 0 - for i < len(a) && j < len(b) { - if lessByKey(a[i], b[j]) { - result = append(result, a[i]) - i++ - } else if lessByKey(b[j], a[i]) { - result = append(result, b[j]) - j++ - } else { - result = append(result, a[i]) - i++ - j++ - } - } - for ; i < len(a); i++ { - result = append(result, a[i]) - } - for ; j < len(b); j++ { - result = append(result, b[j]) - } - return result -} - -// nWayUnion will merge and dedupe n lists of chunks. -// lists must be sorted and not contain dupes. -func nWayUnion(sets []ByKey) ByKey { - l := len(sets) - switch l { - case 0: - return ByKey{} - case 1: - return sets[0] - case 2: - return merge(sets[0], sets[1]) - default: - var ( - split = l / 2 - left = nWayUnion(sets[:split]) - right = nWayUnion(sets[split:]) - ) - return nWayUnion([]ByKey{left, right}) - } -} - -// nWayIntersect will interesct n sorted lists of chunks. -func nWayIntersect(sets []ByKey) ByKey { - l := len(sets) - switch l { - case 0: - return ByKey{} - case 1: - return sets[0] - case 2: - var ( - left, right = sets[0], sets[1] - i, j = 0, 0 - result = []Chunk{} - ) - for i < len(left) && j < len(right) { - if equalByKey(left[i], right[j]) { - result = append(result, left[i]) - } - - if lessByKey(left[i], right[j]) { - i++ - } else { - j++ - } - } - return result - default: - var ( - split = l / 2 - left = nWayIntersect(sets[:split]) - right = nWayIntersect(sets[split:]) - ) - return nWayIntersect([]ByKey{left, right}) - } -} diff --git a/pkg/chunk/by_key_test.go b/pkg/chunk/by_key_test.go deleted file mode 100644 index 3c63b0318d7..00000000000 --- a/pkg/chunk/by_key_test.go +++ /dev/null @@ -1,131 +0,0 @@ -package chunk - -import ( - "math/rand" - "reflect" - "sort" - "testing" - - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" -) - -func c(id string) Chunk { - return Chunk{UserID: id} -} - -func TestUnique(t *testing.T) { - for _, tc := range []struct { - in ByKey - want ByKey - }{ - {nil, ByKey{}}, - {ByKey{c("a"), c("a")}, ByKey{c("a")}}, - {ByKey{c("a"), c("a"), c("b"), c("b"), c("c")}, ByKey{c("a"), c("b"), c("c")}}, - {ByKey{c("a"), c("b"), c("c")}, ByKey{c("a"), c("b"), c("c")}}, - } { - have := unique(tc.in) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func TestMerge(t *testing.T) { - type args struct { - a ByKey - b ByKey - } - for _, tc := range []struct { - args args - want ByKey - }{ - {args{ByKey{}, ByKey{}}, ByKey{}}, - {args{ByKey{c("a")}, ByKey{}}, ByKey{c("a")}}, - {args{ByKey{}, ByKey{c("b")}}, ByKey{c("b")}}, - {args{ByKey{c("a")}, ByKey{c("b")}}, ByKey{c("a"), c("b")}}, - { - args{ByKey{c("a"), c("c")}, ByKey{c("a"), c("b"), c("d")}}, - ByKey{c("a"), c("b"), c("c"), c("d")}, - }, - } { - have := merge(tc.args.a, tc.args.b) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func TestNWayUnion(t *testing.T) { - for _, tc := range []struct { - in []ByKey - want ByKey - }{ - {nil, ByKey{}}, - {[]ByKey{{c("a")}}, ByKey{c("a")}}, - {[]ByKey{{c("a")}, {c("a")}}, ByKey{c("a")}}, - {[]ByKey{{c("a")}, {}}, ByKey{c("a")}}, - {[]ByKey{{}, {c("b")}}, ByKey{c("b")}}, - {[]ByKey{{c("a")}, {c("b")}}, ByKey{c("a"), c("b")}}, - { - []ByKey{{c("a"), c("c"), c("e")}, {c("c"), c("d")}, {c("b")}}, - ByKey{c("a"), c("b"), c("c"), c("d"), c("e")}, - }, - { - []ByKey{{c("c"), c("d")}, {c("b")}, {c("a"), c("c"), c("e")}}, - ByKey{c("a"), c("b"), c("c"), c("d"), c("e")}, - }, - } { - have := nWayUnion(tc.in) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func TestNWayIntersect(t *testing.T) { - for _, tc := range []struct { - in []ByKey - want ByKey - }{ - {nil, ByKey{}}, - {[]ByKey{{c("a"), c("b"), c("c")}}, []Chunk{c("a"), c("b"), c("c")}}, - {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}}, ByKey{c("a"), c("c")}}, - {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}, {c("b")}}, ByKey{}}, - {[]ByKey{{c("a"), c("b"), c("c")}, {c("a"), c("c")}, {c("a")}}, ByKey{c("a")}}, - } { - have := nWayIntersect(tc.in) - if !reflect.DeepEqual(tc.want, have) { - assert.Equal(t, tc.want, have) - } - } -} - -func BenchmarkByKeyLess(b *testing.B) { - now := model.Now() - a := ByKey{dummyChunk(now), dummyChunk(now)} - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - a.Less(0, 1) - } -} - -func BenchmarkByKeySort100(b *testing.B) { benchmarkByKeySort(b, 100) } -func BenchmarkByKeySort1000(b *testing.B) { benchmarkByKeySort(b, 1000) } -func BenchmarkByKeySort10000(b *testing.B) { benchmarkByKeySort(b, 10000) } - -func benchmarkByKeySort(b *testing.B, batchSize int) { - chunks := []Chunk{} - for i := 0; i < batchSize; i++ { - chunk := dummyChunk(model.Now() + model.Time(rand.Intn(batchSize))) - chunks = append(chunks, chunk) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - sort.Sort(ByKey(chunks)) - } -} diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 09bd6d0236c..87343017a8a 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -314,6 +314,11 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { return c.Data.UnmarshalFromBuf(remainingData[:int(dataLen)]) } +func equalByKey(a, b Chunk) bool { + return a.UserID == b.UserID && a.Fingerprint == b.Fingerprint && + a.From == b.From && a.Through == b.Through && a.Checksum == b.Checksum +} + func chunksToMatrix(ctx context.Context, chunks []Chunk, from, through model.Time) (model.Matrix, error) { sp, ctx := ot.StartSpanFromContext(ctx, "chunksToMatrix") defer sp.Finish() diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index c7482ec1d37..8c642c1420f 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -2,12 +2,12 @@ package chunk import ( "context" - "encoding/json" "flag" "fmt" "sort" "time" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" ot "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" @@ -53,7 +53,9 @@ func init() { // StoreConfig specifies config for a ChunkStore type StoreConfig struct { CacheConfig cache.Config - MinChunkAge time.Duration + + MinChunkAge time.Duration + QueryChunkLimit int // For injecting different schemas in tests. schemaFactory func(cfg SchemaConfig) Schema @@ -62,7 +64,8 @@ type StoreConfig struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) { cfg.CacheConfig.RegisterFlags(f) - f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "minimum time between chunk update and being saved to the store") + f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.") + f.IntVar(&cfg.QueryChunkLimit, "store.query-chunk-limit", 2e6, "Maximum number of chunks that can be fetched in a single query.") } // Store implements Store @@ -166,20 +169,45 @@ func (c *Store) calculateDynamoWrites(userID string, chunks []Chunk) (WriteBatch return writeReqs, nil } +// spanLogger unifies tracing and logging, to reduce repetition. +type spanLogger struct { + log.Logger + ot.Span +} + +func newSpanLogger(ctx context.Context, method string) (*spanLogger, context.Context) { + span, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") + return &spanLogger{ + Logger: log.With(util.WithContext(ctx, util.Logger), "method", method), + Span: span, + }, ctx +} + +func (s *spanLogger) Log(kvps ...interface{}) error { + s.Logger.Log(kvps...) + fields, err := otlog.InterleavedKVToFields(kvps...) + if err != nil { + return err + } + s.Span.LogFields(fields...) + return nil +} + // Get implements ChunkStore func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) (model.Matrix, error) { - sp, ctx := ot.StartSpanFromContext(ctx, "ChunkStore.Get") - defer sp.Finish() + log, ctx := newSpanLogger(ctx, "ChunkStore.Get") + defer log.Span.Finish() + + now := model.Now() + level.Debug(log).Log("from", from, "through", through, "now", now, "matchers", len(allMatchers)) if through < from { return nil, fmt.Errorf("invalid query, through < from (%d < %d)", through, from) } - now := model.Now() - sp.LogFields(otlog.String("from", from.String()), otlog.String("through", through.String()), otlog.String("now", now.String())) if from.After(now) { // time-span start is in future ... regard as legal - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) + level.Error(log).Log("msg", "whole timerange in future, yield empty resultset", "through", through, "from", from, "now", now) return nil, nil } @@ -190,14 +218,14 @@ func (c *Store) Get(ctx context.Context, from, through model.Time, allMatchers . if through.After(now.Add(5 * time.Minute)) { // time-span end is in future ... regard as legal - level.Error(util.WithContext(ctx, util.Logger)).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now) + level.Error(log).Log("msg", "adjusting end timerange from future to now", "old_through", through, "new_through", now) through = now // Avoid processing future part - otherwise some schemas could fail with eg non-existent table gripes } // Fetch metric name chunks if the matcher is of type equal, metricNameMatcher, matchers, ok := util.ExtractMetricNameMatcherFromMatchers(allMatchers) if ok && metricNameMatcher.Type == labels.MatchEqual { - sp.SetTag("metric", metricNameMatcher.Value) + log.Span.SetTag("metric", metricNameMatcher.Value) return c.getMetricNameMatrix(ctx, from, through, matchers, metricNameMatcher.Value) } @@ -214,12 +242,15 @@ func (c *Store) getMetricNameMatrix(ctx context.Context, from, through model.Tim } func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { - logger := util.WithContext(ctx, util.Logger) + log, ctx := newSpanLogger(ctx, "ChunkStore.getMetricNameChunks") + level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "matchers", len(allMatchers)) + filters, matchers := util.SplitFiltersAndMatchers(allMatchers) chunks, err := c.lookupChunksByMetricName(ctx, from, through, matchers, metricName) if err != nil { return nil, err } + level.Debug(log).Log("Chunks in index", len(chunks)) // Filter out chunks that are not in the selected time range. filtered := make([]Chunk, 0, len(chunks)) @@ -231,33 +262,37 @@ func (c *Store) getMetricNameChunks(ctx context.Context, from, through model.Tim filtered = append(filtered, chunk) keys = append(keys, chunk.ExternalKey()) } + level.Debug(log).Log("Chunks post filtering", len(chunks)) + + if len(filtered) > c.cfg.QueryChunkLimit { + err := fmt.Errorf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), c.cfg.QueryChunkLimit) + level.Error(log).Log("err", err) + return nil, err + } // Now fetch the actual chunk data from Memcache / S3 cacheHits, cacheBufs, _, err := c.cache.FetchChunkData(ctx, keys) if err != nil { - level.Warn(logger).Log("msg", "error fetching from cache", "err", err) + level.Warn(log).Log("msg", "error fetching from cache", "err", err) } fromCache, missing, err := ProcessCacheResponse(filtered, cacheHits, cacheBufs) if err != nil { - level.Warn(logger).Log("msg", "error fetching from cache", "err", err) + level.Warn(log).Log("msg", "error fetching from cache", "err", err) } fromStorage, err := c.storage.GetChunks(ctx, missing) // Always cache any chunks we did get if cacheErr := c.writeBackCache(ctx, fromStorage); cacheErr != nil { - level.Warn(logger).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) + level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) } if err != nil { return nil, promql.ErrStorage(err) } - // TODO instead of doing this sort, propagate an index and assign chunks - // into the result based on that index. allChunks := append(fromCache, fromStorage...) - sort.Sort(ByKey(allChunks)) // Filter out chunks filteredChunks := make([]Chunk, 0, len(allChunks)) @@ -374,6 +409,8 @@ outer: } func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { + log, ctx := newSpanLogger(ctx, "ChunkStore.lookupChunksByMetricName") + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -385,17 +422,25 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode if err != nil { return nil, err } + level.Debug(log).Log("queries", len(queries)) entries, err := c.lookupEntriesByQueries(ctx, queries) if err != nil { return nil, err } + level.Debug(log).Log("entries", len(entries)) - return c.convertIndexEntriesToChunks(ctx, entries, nil) + chunkIDs, err := c.parseIndexEntries(ctx, entries, nil) + if err != nil { + return nil, err + } + level.Debug(log).Log("chunkIDs", len(chunkIDs)) + + return c.convertChunkIDsToChunks(ctx, chunkIDs) } // Otherwise get chunks which include other matchers - incomingChunkSets := make(chan ByKey) + incomingChunkIDs := make(chan []string) incomingErrors := make(chan error) for _, matcher := range matchers { go func(matcher *labels.Matcher) { @@ -411,6 +456,7 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode incomingErrors <- err return } + level.Debug(log).Log("matcher", matcher, "queries", len(queries)) // Lookup IndexEntry's entries, err := c.lookupEntriesByQueries(ctx, queries) @@ -418,31 +464,42 @@ func (c *Store) lookupChunksByMetricName(ctx context.Context, from, through mode incomingErrors <- err return } + level.Debug(log).Log("matcher", matcher, "entries", len(entries)) - // Convert IndexEntry's into chunks - chunks, err := c.convertIndexEntriesToChunks(ctx, entries, matcher) + // Convert IndexEntry's to chunk IDs, filter out non-matchers at the same time. + chunkIDs, err := c.parseIndexEntries(ctx, entries, matcher) if err != nil { incomingErrors <- err - } else { - incomingChunkSets <- chunks + return } + level.Debug(log).Log("matcher", matcher, "chunkIDs", len(chunkIDs)) + incomingChunkIDs <- chunkIDs }(matcher) } // Receive chunkSets from all matchers - var chunkSets []ByKey + var chunkIDs []string var lastErr error for i := 0; i < len(matchers); i++ { select { - case incoming := <-incomingChunkSets: - chunkSets = append(chunkSets, incoming) + case incoming := <-incomingChunkIDs: + if chunkIDs == nil { + chunkIDs = incoming + } else { + chunkIDs = intersectStrings(chunkIDs, incoming) + } case err := <-incomingErrors: lastErr = err } } + if lastErr != nil { + return nil, lastErr + } - // Merge chunkSets in order because we wish to keep label series together consecutively - return nWayIntersect(chunkSets), lastErr + level.Debug(log).Log("msg", "post intersection", "entries", len(chunkIDs)) + + // Convert IndexEntry's into chunks + return c.convertChunkIDsToChunks(ctx, chunkIDs) } func (c *Store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { @@ -495,44 +552,44 @@ func (c *Store) lookupEntriesByQuery(ctx context.Context, query IndexQuery) ([]I return entries, nil } -func (c *Store) convertIndexEntriesToChunks(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) (ByKey, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, err - } - - var chunkSet ByKey +func (c *Store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { + result := make([]string, 0, len(entries)) for _, entry := range entries { - chunkKey, labelValue, metadataInIndex, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) + chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) if err != nil { return nil, err } - chunk, err := ParseExternalKey(userID, chunkKey) - if err != nil { - return nil, err + if matcher != nil && !matcher.Matches(string(labelValue)) { + level.Debug(util.WithContext(ctx, util.Logger)).Log("msg", "dropping chunk for non-matching label", "label", labelValue) + continue } + result = append(result, chunkKey) + } - // This can be removed in Dev 2017, 13 months after the last chunks - // was written with metadata in the index. - if metadataInIndex && entry.Value != nil { - if err := json.Unmarshal(entry.Value, &chunk); err != nil { - return nil, err - } - chunk.metadataInIndex = true - } + // Return ids sorted and deduped because they will be merged with other sets. + sort.Strings(result) + result = uniqueStrings(result) + return result, nil +} - if matcher != nil && !matcher.Matches(string(labelValue)) { - level.Debug(util.WithContext(ctx, util.Logger)).Log("msg", "dropping chunk for non-matching metric", "metric", chunk.Metric) - continue +func (c *Store) convertChunkIDsToChunks(ctx context.Context, chunkIDs []string) ([]Chunk, error) { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, err + } + + chunkSet := make([]Chunk, 0, len(chunkIDs)) + for _, chunkID := range chunkIDs { + chunk, err := ParseExternalKey(userID, chunkID) + if err != nil { + return nil, err } chunkSet = append(chunkSet, chunk) } - // Return chunks sorted and deduped because they will be merged with other sets - sort.Sort(chunkSet) - return unique(chunkSet), nil + return chunkSet, nil } func (c *Store) writeBackCache(ctx context.Context, chunks []Chunk) error { diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 663b7329114..94bb6733e67 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -194,7 +194,8 @@ func TestChunkStore_Get(t *testing.T) { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, + schemaFactory: schema.fn, + QueryChunkLimit: 2e6, }) if err := store.Put(ctx, []Chunk{ @@ -333,7 +334,8 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { t.Run(fmt.Sprintf("%s / %s", tc.query, schema.name), func(t *testing.T) { t.Log("========= Running query", tc.query, "with schema", schema.name) store := newTestChunkStore(t, StoreConfig{ - schemaFactory: schema.fn, + schemaFactory: schema.fn, + QueryChunkLimit: 2e6, }) if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { @@ -378,7 +380,8 @@ func TestChunkStoreRandom(t *testing.T) { for i := range schemas { schemas[i].store = newTestChunkStore(t, StoreConfig{ - schemaFactory: schemas[i].fn, + schemaFactory: schemas[i].fn, + QueryChunkLimit: 2e6, }) } @@ -447,7 +450,8 @@ func TestChunkStoreLeastRead(t *testing.T) { // Test we don't read too much from the index ctx := user.InjectOrgID(context.Background(), userID) store := newTestChunkStore(t, StoreConfig{ - schemaFactory: v6Schema, + schemaFactory: v6Schema, + QueryChunkLimit: 2e6, }) // Put 24 chunks 1hr chunks in the store diff --git a/pkg/chunk/storage/by_key_test.go b/pkg/chunk/storage/by_key_test.go new file mode 100644 index 00000000000..8d157b80956 --- /dev/null +++ b/pkg/chunk/storage/by_key_test.go @@ -0,0 +1,12 @@ +package storage + +import ( + "github.com/weaveworks/cortex/pkg/chunk" +) + +// ByKey allow you to sort chunks by ID +type ByKey []chunk.Chunk + +func (cs ByKey) Len() int { return len(cs) } +func (cs ByKey) Swap(i, j int) { cs[i], cs[j] = cs[j], cs[i] } +func (cs ByKey) Less(i, j int) bool { return cs[i].ExternalKey() < cs[j].ExternalKey() } diff --git a/pkg/chunk/storage/storage_client_test.go b/pkg/chunk/storage/storage_client_test.go index e3c5947daf2..c661a878721 100644 --- a/pkg/chunk/storage/storage_client_test.go +++ b/pkg/chunk/storage/storage_client_test.go @@ -49,8 +49,8 @@ func TestChunksBasic(t *testing.T) { require.NoError(t, err) require.Equal(t, len(chunksToGet), len(chunksWeGot)) - sort.Sort(chunk.ByKey(chunksToGet)) - sort.Sort(chunk.ByKey(chunksWeGot)) + sort.Sort(ByKey(chunksToGet)) + sort.Sort(ByKey(chunksWeGot)) for j := 0; j < len(chunksWeGot); j++ { require.Equal(t, chunksToGet[i].ExternalKey(), chunksWeGot[i].ExternalKey(), strconv.Itoa(i)) } diff --git a/pkg/chunk/strings.go b/pkg/chunk/strings.go new file mode 100644 index 00000000000..0d91e640d2d --- /dev/null +++ b/pkg/chunk/strings.go @@ -0,0 +1,59 @@ +package chunk + +func uniqueStrings(cs []string) []string { + if len(cs) == 0 { + return []string{} + } + + result := make([]string, 1, len(cs)) + result[0] = cs[0] + i, j := 0, 1 + for j < len(cs) { + if result[i] == cs[j] { + j++ + continue + } + result = append(result, cs[j]) + i++ + j++ + } + return result +} + +func intersectStrings(left, right []string) []string { + var ( + i, j = 0, 0 + result = []string{} + ) + for i < len(left) && j < len(right) { + if left[i] == right[j] { + result = append(result, left[i]) + } + + if left[i] < right[j] { + i++ + } else { + j++ + } + } + return result +} + +func nWayIntersectStrings(sets [][]string) []string { + l := len(sets) + switch l { + case 0: + return []string{} + case 1: + return sets[0] + case 2: + return intersectStrings(sets[0], sets[1]) + default: + var ( + split = l / 2 + left = nWayIntersectStrings(sets[:split]) + right = nWayIntersectStrings(sets[split:]) + ) + return intersectStrings(left, right) + } +}