diff --git a/chunk/chunk.go b/chunk/chunk.go index fda0cf6f9fd..a7be7c6fde1 100644 --- a/chunk/chunk.go +++ b/chunk/chunk.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" "io" + "strconv" + "strings" "github.com/docker/docker/pkg/ioutils" "github.com/golang/snappy" @@ -49,6 +51,26 @@ func NewChunk(fp model.Fingerprint, metric model.Metric, c *prom_chunk.Desc) Chu } } +func parseChunkID(id string) (model.Fingerprint, model.Time, model.Time, error) { + parts := strings.Split(id, ":") + if len(parts) != 3 { + return 0, 0, 0, fmt.Errorf("invalid chunk ID") + } + fingerprint, err := strconv.ParseUint(parts[0], 10, 64) + if err != nil { + return 0, 0, 0, err + } + firstTime, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return 0, 0, 0, err + } + lastTime, err := strconv.ParseInt(parts[2], 10, 64) + if err != nil { + return 0, 0, 0, err + } + return model.Fingerprint(fingerprint), model.Time(firstTime), model.Time(lastTime), nil +} + func (c *Chunk) reader() (io.ReadSeeker, error) { // Encode chunk metadata into snappy-compressed buffer var metadata bytes.Buffer diff --git a/chunk/chunk_store.go b/chunk/chunk_store.go index fdd7b4ec2c9..7d4418bd56b 100644 --- a/chunk/chunk_store.go +++ b/chunk/chunk_store.go @@ -417,14 +417,27 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers . } // TODO push ctx all the way through, so we can do cancellation (eventually!) - missing, err := c.lookupChunks(userID, from, through, matchers) + chunks, err := c.lookupChunks(userID, from, through, matchers) if err != nil { return nil, err } - queryChunks.Observe(float64(len(missing))) + filtered := make([]Chunk, 0, len(chunks)) + for _, chunk := range chunks { + _, chunkFrom, chunkThrough, err := parseChunkID(chunk.ID) + if err != nil { + return nil, err + } + if chunkThrough < from || through < chunkFrom { + continue + } + filtered = append(filtered, chunk) + } + + queryChunks.Observe(float64(len(filtered))) var fromCache []Chunk + var missing = filtered if c.chunkCache != nil { fromCache, missing, err = c.chunkCache.FetchChunkData(userID, missing) if err != nil { @@ -445,9 +458,9 @@ func (c *AWSStore) Get(ctx context.Context, from, through model.Time, matchers . // TODO instead of doing this sort, propagate an index and assign chunks // into the result based on that index. - chunks := append(fromCache, fromS3...) - sort.Sort(ByID(chunks)) - return chunks, nil + allChunks := append(fromCache, fromS3...) + sort.Sort(ByID(allChunks)) + return allChunks, nil } func extractMetricName(matchers []*metric.LabelMatcher) (model.LabelValue, []*metric.LabelMatcher, error) { diff --git a/chunk/chunk_store_test.go b/chunk/chunk_store_test.go index d06481a39fd..823829f6a20 100644 --- a/chunk/chunk_store_test.go +++ b/chunk/chunk_store_test.go @@ -58,30 +58,32 @@ func TestChunkStore(t *testing.T) { chunks, _ := chunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) - chunk1 := Chunk{ - ID: "chunk1", - From: now.Add(-time.Hour), - Through: now, - Metric: model.Metric{ + chunk1 := NewChunk( + model.Fingerprint(1), + model.Metric{ model.MetricNameLabel: "foo", "bar": "baz", "toms": "code", }, - Encoding: chunk.DoubleDelta, - Data: chunks[0], - } - chunk2 := Chunk{ - ID: "chunk2", - From: now.Add(-time.Hour), - Through: now, - Metric: model.Metric{ + &chunk.Desc{ + ChunkFirstTime: now.Add(-time.Hour), + ChunkLastTime: now, + C: chunks[0], + }, + ) + chunk2 := NewChunk( + model.Fingerprint(2), + model.Metric{ model.MetricNameLabel: "foo", "bar": "beep", "toms": "code", }, - Encoding: chunk.DoubleDelta, - Data: chunks[0], - } + &chunk.Desc{ + ChunkFirstTime: now.Add(-time.Hour), + ChunkLastTime: now, + C: chunks[0], + }, + ) err := store.Put(ctx, []Chunk{chunk1, chunk2}) if err != nil { @@ -96,7 +98,7 @@ func TestChunkStore(t *testing.T) { } if !reflect.DeepEqual(expect, chunks) { - t.Fatalf("wrong chunks - " + diff(expect, chunks)) + t.Fatalf("%s: wrong chunks - %s", name, diff(expect, chunks)) } }