diff --git a/Gopkg.lock b/Gopkg.lock index f35732327e8..2a0c8fa99df 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1411,6 +1411,7 @@ "github.com/prometheus/prometheus/util/strutil", "github.com/prometheus/prometheus/web/api/v1", "github.com/prometheus/tsdb", + "github.com/prometheus/tsdb/chunkenc", "github.com/prometheus/tsdb/fileutil", "github.com/segmentio/fasthash/fnv1a", "github.com/stretchr/testify/assert", diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index 706d340a680..b87fb83921b 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -12,7 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" - prom_chunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 0b5cb30e7c4..78de06218be 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -10,7 +10,7 @@ import ( "strings" "sync" - prom_chunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" @@ -28,6 +28,7 @@ const ( ErrInvalidChecksum = errs.Error("invalid chunk checksum") ErrWrongMetadata = errs.Error("wrong chunk metadata") ErrMetadataLength = errs.Error("chunk metadata wrong length") + ErrDataLength = errs.Error("chunk data wrong length") ) var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) @@ -213,12 +214,12 @@ func (c *Chunk) Encode() ([]byte, error) { // Write the metadata length back at the start of the buffer. // (note this length includes the 4 bytes for the length itself) - binary.BigEndian.PutUint32(metadataLenBytes[:], uint32(buf.Len())) + metadataLen := buf.Len() + binary.BigEndian.PutUint32(metadataLenBytes[:], uint32(metadataLen)) copy(buf.Bytes(), metadataLenBytes[:]) - // Write the data length + // Write another 4 empty bytes - we will come back and put the len in here. dataLenBytes := [4]byte{} - binary.BigEndian.PutUint32(dataLenBytes[:], uint32(prom_chunk.ChunkLen)) if _, err := buf.Write(dataLenBytes[:]); err != nil { return nil, err } @@ -228,6 +229,10 @@ func (c *Chunk) Encode() ([]byte, error) { return nil, err } + // Now write the data len back into the buf. + binary.BigEndian.PutUint32(dataLenBytes[:], uint32(buf.Len()-metadataLen-4)) + copy(buf.Bytes()[metadataLen:], dataLenBytes[:]) + // Now work out the checksum c.encoded = buf.Bytes() c.ChecksumSet = true @@ -314,6 +319,10 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { c.encoded = input remainingData := input[len(input)-r.Len():] + if int(dataLen) != len(remainingData) { + return ErrDataLength + } + return c.Data.UnmarshalFromBuf(remainingData[:int(dataLen)]) } diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 310ea8fbe16..d123b39be23 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -16,7 +16,7 @@ import ( "golang.org/x/net/context" "github.com/cortexproject/cortex/pkg/chunk/cache" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/validation" @@ -415,7 +415,7 @@ func TestChunkStoreRandom(t *testing.T) { const chunkLen = 2 * 3600 // in seconds for i := 0; i < 100; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := chunk.New().Add(model.SamplePair{ + chunks, _ := encoding.New().Add(model.SamplePair{ Timestamp: ts, Value: model.SampleValue(float64(i)), }) @@ -479,7 +479,7 @@ func TestChunkStoreLeastRead(t *testing.T) { const chunkLen = 60 // in seconds for i := 0; i < 24; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := chunk.New().Add(model.SamplePair{ + chunks, _ := encoding.New().Add(model.SamplePair{ Timestamp: ts, Value: model.SampleValue(float64(i)), }) diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index 66d252ec617..a396a665074 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/util" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -24,13 +24,21 @@ func dummyChunk(now model.Time) Chunk { }) } -func dummyChunkFor(now model.Time, metric model.Metric) Chunk { - cs, _ := chunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) +func dummyChunkForEncoding(now model.Time, metric model.Metric, enc encoding.Encoding, samples int) Chunk { + c, _ := encoding.NewForEncoding(enc) + for i := 0; i < samples; i++ { + t := time.Duration(i) * 15 * time.Second + cs, err := c.Add(model.SamplePair{Timestamp: now.Add(t), Value: 0}) + if err != nil { + panic(err) + } + c = cs[0] + } chunk := NewChunk( userID, metric.Fingerprint(), metric, - cs[0], + c, now.Add(-time.Hour), now, ) @@ -42,6 +50,10 @@ func dummyChunkFor(now model.Time, metric model.Metric) Chunk { return chunk } +func dummyChunkFor(now model.Time, metric model.Metric) Chunk { + return dummyChunkForEncoding(now, metric, encoding.Varbit, 1) +} + func TestChunkCodec(t *testing.T) { dummy := dummyChunk(model.Now()) decodeContext := NewDecodeContext() diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go new file mode 100644 index 00000000000..50e8a318bec --- /dev/null +++ b/pkg/chunk/encoding/bigchunk.go @@ -0,0 +1,322 @@ +package encoding + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "io/ioutil" + + "github.com/prometheus/common/model" + "github.com/prometheus/tsdb/chunkenc" +) + +const samplesPerChunk = 120 + +var errOutOfBounds = errors.New("out of bounds") + +// bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no +// upperbound on number of samples it can contain. +type bigchunk struct { + chunks []chunkenc.Chunk + starts []int64 + ends []int64 + + appender chunkenc.Appender + remainingSamples int +} + +func newBigchunk() *bigchunk { + return &bigchunk{} +} + +func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { + if b.remainingSamples == 0 { + if err := b.addNextChunk(sample.Timestamp); err != nil { + return nil, err + } + } + + b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) + b.remainingSamples-- + b.ends[len(b.ends)-1] = int64(sample.Timestamp) + return []Chunk{b}, nil +} + +// addNextChunk adds a new XOR "subchunk" to the internal list of chunks. +func (b *bigchunk) addNextChunk(start model.Time) error { + // To save memory, we "compact" the previous chunk - the array backing the slice + // will be upto 2x too big, and we can save this space. + if l := len(b.chunks); l > 0 { + c := b.chunks[l-1] + buf := make([]byte, len(c.Bytes())) + copy(buf, c.Bytes()) + compacted, err := chunkenc.FromData(chunkenc.EncXOR, buf) + if err != nil { + return err + } + b.chunks[l-1] = compacted + } + + chunk := chunkenc.NewXORChunk() + appender, err := chunk.Appender() + if err != nil { + return err + } + + b.starts = append(b.starts, int64(start)) + b.ends = append(b.ends, int64(start)) + b.chunks = append(b.chunks, chunk) + + b.appender = appender + b.remainingSamples = samplesPerChunk + return nil +} + +func (b *bigchunk) Marshal(wio io.Writer) error { + w := writer{wio} + if err := w.WriteVarInt16(uint16(len(b.chunks))); err != nil { + return err + } + for _, chunk := range b.chunks { + buf := chunk.Bytes() + if err := w.WriteVarInt16(uint16(len(buf))); err != nil { + return err + } + if _, err := w.Write(buf); err != nil { + return err + } + } + return nil +} + +func (b *bigchunk) MarshalToBuf(buf []byte) error { + writer := bytes.NewBuffer(buf) + return b.Marshal(writer) +} + +func (b *bigchunk) Unmarshal(r io.Reader) error { + buf, err := ioutil.ReadAll(r) + if err != nil { + return err + } + return b.UnmarshalFromBuf(buf) +} + +func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { + r := reader{buf: buf} + numChunks, err := r.ReadUint16() + if err != nil { + return err + } + + b.chunks = make([]chunkenc.Chunk, 0, numChunks) + for i := uint16(0); i < numChunks; i++ { + chunkLen, err := r.ReadUint16() + if err != nil { + return err + } + + chunkBuf, err := r.ReadBytes(int(chunkLen)) + if err != nil { + return err + } + + chunk, err := chunkenc.FromData(chunkenc.EncXOR, chunkBuf) + if err != nil { + return err + } + + start, end, err := firstAndLastTimes(chunk) + if err != nil { + return err + } + + b.chunks = append(b.chunks, chunk) + b.starts = append(b.starts, start) + b.ends = append(b.ends, end) + } + return nil +} + +func (b *bigchunk) Encoding() Encoding { + return Bigchunk +} + +func (b *bigchunk) Utilization() float64 { + return 1.0 +} + +func (b *bigchunk) Len() int { + sum := 0 + for _, c := range b.chunks { + sum += c.NumSamples() + } + return sum +} + +func (b *bigchunk) Size() int { + sum := 0 + for _, c := range b.chunks { + sum += len(c.Bytes()) + } + return sum +} + +func (b *bigchunk) NewIterator() Iterator { + return &bigchunkIterator{ + bigchunk: b, + } +} + +func (b *bigchunk) Slice(start, end model.Time) Chunk { + i, j := 0, len(b.chunks) + for k := 0; k < len(b.chunks); k++ { + if b.ends[k] < int64(start) { + i = k + 1 + } + if b.starts[k] > int64(end) { + j = k + break + } + } + return &bigchunk{ + chunks: b.chunks[i:j], + starts: b.starts[i:j], + ends: b.ends[i:j], + } +} + +type writer struct { + io.Writer +} + +func (w writer) WriteVarInt16(i uint16) error { + var b [2]byte + binary.LittleEndian.PutUint16(b[:], i) + _, err := w.Write(b[:]) + return err +} + +type reader struct { + i int + buf []byte +} + +func (r *reader) ReadUint16() (uint16, error) { + if r.i+2 > len(r.buf) { + return 0, errOutOfBounds + } + result := binary.LittleEndian.Uint16(r.buf[r.i:]) + r.i += 2 + return result, nil +} + +func (r *reader) ReadBytes(count int) ([]byte, error) { + if r.i+count > len(r.buf) { + return nil, errOutOfBounds + } + result := r.buf[r.i : r.i+count] + r.i += count + return result, nil +} + +type bigchunkIterator struct { + *bigchunk + + iter chunkenc.Iterator + i int +} + +func (i *bigchunkIterator) FindAtOrAfter(target model.Time) bool { + // On average we'll have about 12*3600/15/120 = 24 chunks, so just linear + // scan for now. + i.i = 0 + for i.i < len(i.chunks) { + if int64(target) <= i.ends[i.i] { + break + } + i.i++ + } + + if i.i >= len(i.chunks) { + return false + } + + i.iter = i.chunks[i.i].Iterator() + i.i++ + + for i.iter.Next() { + t, _ := i.iter.At() + if t >= int64(target) { + return true + } + } + + return false +} + +func (i *bigchunkIterator) Scan() bool { + if i.iter != nil && i.iter.Next() { + return true + } + + for i.i < len(i.chunks) { + i.iter = i.chunks[i.i].Iterator() + i.i++ + if i.iter.Next() { + return true + } + } + return false +} + +func (i *bigchunkIterator) Value() model.SamplePair { + t, v := i.iter.At() + return model.SamplePair{ + Timestamp: model.Time(t), + Value: model.SampleValue(v), + } +} + +func (i *bigchunkIterator) Batch(size int) Batch { + var result Batch + j := 0 + for j < size { + t, v := i.iter.At() + result.Timestamps[j] = t + result.Values[j] = v + j++ + + if j < size && !i.Scan() { + break + } + } + result.Length = j + return result +} + +func (i *bigchunkIterator) Err() error { + if i.iter != nil { + return i.iter.Err() + } + return nil +} + +func firstAndLastTimes(c chunkenc.Chunk) (int64, int64, error) { + var ( + first int64 + last int64 + firstSet bool + iter = c.Iterator() + ) + for iter.Next() { + t, _ := iter.At() + if !firstSet { + first = t + firstSet = true + } + last = t + } + return first, last, iter.Err() +} diff --git a/pkg/chunk/encoding/bigchunk_test.go b/pkg/chunk/encoding/bigchunk_test.go new file mode 100644 index 00000000000..6c3bd432eaa --- /dev/null +++ b/pkg/chunk/encoding/bigchunk_test.go @@ -0,0 +1,89 @@ +package encoding + +import ( + "bytes" + "fmt" + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestSliceBiggerChunk(t *testing.T) { + var c Chunk = newBigchunk() + for i := 0; i < 12*3600/15; i++ { + cs, err := c.Add(model.SamplePair{ + Timestamp: model.Time(i * step), + Value: model.SampleValue(i), + }) + require.NoError(t, err) + c = cs[0] + } + + // Test for when the slice aligns perfectly with the sub-chunk boundaries. + + for i := 0; i < (12*3600/15)-480; i += 120 { + s := c.Slice(model.Time(i*step), model.Time((i+479)*step)) + iter := s.NewIterator() + for j := i; j < i+480; j++ { + require.True(t, iter.Scan()) + sample := iter.Value() + require.Equal(t, sample.Timestamp, model.Time(j*step)) + require.Equal(t, sample.Value, model.SampleValue(j)) + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) + } + + // Test for when the slice does not align perfectly with the sub-chunk boundaries. + for i := 0; i < (12*3600/15)-500; i += 100 { + s := c.Slice(model.Time(i*step), model.Time((i+500)*step)) + iter := s.NewIterator() + + // Consume some samples until we get to where we want to be. + for { + require.True(t, iter.Scan()) + sample := iter.Value() + if sample.Timestamp == model.Time(i*step) { + break + } + } + + for j := i; j < i+500; j++ { + sample := iter.Value() + require.Equal(t, sample.Timestamp, model.Time(j*step)) + require.Equal(t, sample.Value, model.SampleValue(j)) + require.True(t, iter.Scan()) + } + } +} + +func BenchmarkBiggerChunkMemory(b *testing.B) { + for i := 0; i < b.N; i++ { + var c Chunk = newBigchunk() + for i := 0; i < 12*3600/15; i++ { + cs, err := c.Add(model.SamplePair{ + Timestamp: model.Time(i * step), + Value: model.SampleValue(i), + }) + require.NoError(b, err) + c = cs[0] + } + + c.(*bigchunk).printSize() + } +} + +// printSize calculates various sizes of the chunk when encoded, and in memory. +func (b *bigchunk) printSize() { + var buf bytes.Buffer + b.Marshal(&buf) + + var size, allocd int + for _, c := range b.chunks { + size += len(c.Bytes()) + allocd += cap(c.Bytes()) + } + + fmt.Println("encodedlen =", len(buf.Bytes()), "subchunks =", len(b.chunks), "len =", size, "cap =", allocd) +} diff --git a/pkg/prom1/storage/local/chunk/chunk.go b/pkg/chunk/encoding/chunk.go similarity index 50% rename from pkg/prom1/storage/local/chunk/chunk.go rename to pkg/chunk/encoding/chunk.go index df5ef9e99f4..3ed281af2f3 100644 --- a/pkg/prom1/storage/local/chunk/chunk.go +++ b/pkg/chunk/encoding/chunk.go @@ -14,16 +14,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( - "container/list" "errors" "fmt" "io" "sort" - "sync" - "sync/atomic" "github.com/prometheus/common/model" @@ -41,12 +38,6 @@ var ( errAddedToEvictedChunk = errors.New("attempted to add sample to evicted chunk") ) -// EvictRequest is a request to evict a chunk from memory. -type EvictRequest struct { - Desc *Desc - Evict bool -} - // Encoding defines which encoding we are using, delta, doubledelta, or varbit type Encoding byte @@ -64,6 +55,8 @@ func (e *Encoding) Set(s string) error { *e = DoubleDelta case "2": *e = Varbit + case "3": + *e = Bigchunk default: return fmt.Errorf("invalid chunk encoding: %s", s) } @@ -77,195 +70,10 @@ const ( DoubleDelta // Varbit encoding Varbit + // Bigchunk encoding + Bigchunk ) -// Desc contains meta-data for a chunk. Pay special attention to the -// documented requirements for calling its methods concurrently (WRT pinning and -// locking). The doc comments spell out the requirements for each method, but -// here is an overview and general explanation: -// -// Everything that changes the pinning of the underlying chunk or deals with its -// eviction is protected by a mutex. This affects the following methods: Pin, -// Unpin, RefCount, IsEvicted, MaybeEvict. These methods can be called at any -// time without further prerequisites. -// -// Another group of methods acts on (or sets) the underlying chunk. These -// methods involve no locking. They may only be called if the caller has pinned -// the chunk (to guarantee the chunk is not evicted concurrently). Also, the -// caller must make sure nobody else will call these methods concurrently, -// either by holding the sole reference to the Desc (usually during loading -// or creation) or by locking the fingerprint of the series the Desc -// belongs to. The affected methods are: Add, MaybePopulateLastTime, SetChunk. -// -// Finally, there are the special cases FirstTime and LastTime. LastTime requires -// to have locked the fingerprint of the series but the chunk does not need to -// be pinned. That's because the ChunkLastTime field in Desc gets populated -// upon completion of the chunk (when it is still pinned, and which happens -// while the series's fingerprint is locked). Once that has happened, calling -// LastTime does not require the chunk to be loaded anymore. Before that has -// happened, the chunk is pinned anyway. The ChunkFirstTime field in Desc -// is populated upon creation of a Desc, so it is alway safe to call -// FirstTime. The FirstTime method is arguably not needed and only there for -// consistency with LastTime. -type Desc struct { - sync.Mutex // Protects pinning. - C Chunk // nil if chunk is evicted. - rCnt int - ChunkFirstTime model.Time // Populated at creation. Immutable. - ChunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. - - // EvictListElement is nil if the chunk is not in the evict list. - // EvictListElement is _not_ protected by the Desc mutex. - // It must only be touched by the evict list handler in MemorySeriesStorage. - EvictListElement *list.Element -} - -// NewDesc creates a new Desc pointing to the provided chunk. The provided chunk -// is assumed to be not persisted yet. Therefore, the refCount of the new -// Desc is 1 (preventing eviction prior to persisting). -func NewDesc(c Chunk, firstTime model.Time) *Desc { - Ops.WithLabelValues(CreateAndPin).Inc() - atomic.AddInt64(&NumMemChunks, 1) - NumMemDescs.Inc() - return &Desc{ - C: c, - rCnt: 1, - ChunkFirstTime: firstTime, - ChunkLastTime: model.Earliest, - } -} - -// Add adds a sample pair to the underlying chunk. For safe concurrent access, -// The chunk must be pinned, and the caller must have locked the fingerprint of -// the series. -func (d *Desc) Add(s model.SamplePair) ([]Chunk, error) { - if d.C == nil { - return nil, errAddedToEvictedChunk - } - return d.C.Add(s) -} - -// Pin increments the refCount by one. Upon increment from 0 to 1, this -// Desc is removed from the evict list. To enable the latter, the -// evictRequests channel has to be provided. This method can be called -// concurrently at any time. -func (d *Desc) Pin(evictRequests chan<- EvictRequest) { - d.Lock() - defer d.Unlock() - - if d.rCnt == 0 { - // Remove ourselves from the evict list. - evictRequests <- EvictRequest{d, false} - } - d.rCnt++ -} - -// Unpin decrements the refCount by one. Upon decrement from 1 to 0, this -// Desc is added to the evict list. To enable the latter, the evictRequests -// channel has to be provided. This method can be called concurrently at any -// time. -func (d *Desc) Unpin(evictRequests chan<- EvictRequest) { - d.Lock() - defer d.Unlock() - - if d.rCnt == 0 { - panic("cannot unpin already unpinned chunk") - } - d.rCnt-- - if d.rCnt == 0 { - // Add ourselves to the back of the evict list. - evictRequests <- EvictRequest{d, true} - } -} - -// RefCount returns the number of pins. This method can be called concurrently -// at any time. -func (d *Desc) RefCount() int { - d.Lock() - defer d.Unlock() - - return d.rCnt -} - -// FirstTime returns the timestamp of the first sample in the chunk. This method -// can be called concurrently at any time. It only returns the immutable -// d.ChunkFirstTime without any locking. Arguably, this method is -// useless. However, it provides consistency with the LastTime method. -func (d *Desc) FirstTime() model.Time { - return d.ChunkFirstTime -} - -// LastTime returns the timestamp of the last sample in the chunk. For safe -// concurrent access, this method requires the fingerprint of the time series to -// be locked. -func (d *Desc) LastTime() (model.Time, error) { - if d.ChunkLastTime != model.Earliest || d.C == nil { - return d.ChunkLastTime, nil - } - return d.C.NewIterator().LastTimestamp() -} - -// MaybePopulateLastTime populates the ChunkLastTime from the underlying chunk -// if it has not yet happened. Call this method directly after having added the -// last sample to a chunk or after closing a head chunk due to age. For safe -// concurrent access, the chunk must be pinned, and the caller must have locked -// the fingerprint of the series. -func (d *Desc) MaybePopulateLastTime() error { - if d.ChunkLastTime == model.Earliest && d.C != nil { - t, err := d.C.NewIterator().LastTimestamp() - if err != nil { - return err - } - d.ChunkLastTime = t - } - return nil -} - -// IsEvicted returns whether the chunk is evicted. For safe concurrent access, -// the caller must have locked the fingerprint of the series. -func (d *Desc) IsEvicted() bool { - // Locking required here because we do not want the caller to force - // pinning the chunk first, so it could be evicted while this method is - // called. - d.Lock() - defer d.Unlock() - - return d.C == nil -} - -// SetChunk sets the underlying chunk. The caller must have locked the -// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first -// call Pin and then set the chunk). -func (d *Desc) SetChunk(c Chunk) { - if d.C != nil { - panic("chunk already set") - } - d.C = c -} - -// MaybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk -// is now evicted, which includes the case that the chunk was evicted even -// before this method was called. It can be called concurrently at any time. -func (d *Desc) MaybeEvict() bool { - d.Lock() - defer d.Unlock() - - if d.C == nil { - return true - } - if d.rCnt != 0 { - return false - } - if d.ChunkLastTime == model.Earliest { - // This must never happen. - panic("ChunkLastTime not populated for evicted chunk") - } - d.C = nil - Ops.WithLabelValues(Evict).Inc() - atomic.AddInt64(&NumMemChunks, -1) - return true -} - // Chunk is the interface for all chunks. Chunks are generally not // goroutine-safe. type Chunk interface { @@ -276,40 +84,35 @@ type Chunk interface { // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the original chunk. Add(sample model.SamplePair) ([]Chunk, error) - Clone() Chunk - FirstTime() model.Time NewIterator() Iterator Marshal(io.Writer) error - MarshalToBuf([]byte) error Unmarshal(io.Reader) error UnmarshalFromBuf([]byte) error Encoding() Encoding Utilization() float64 + // Slice returns a smaller chunk the includes all samples between start and end + // (inclusive). Its may over estimate. On some encodings it is a noop. + Slice(start, end model.Time) Chunk + // Len returns the number of samples in the chunk. Implementations may be // expensive. Len() int + + // Size returns the approximate length of the chunk in bytes. + Size() int } // Iterator enables efficient access to the content of a chunk. It is // generally not safe to use an Iterator concurrently with or after chunk // mutation. type Iterator interface { - // Gets the last timestamp in the chunk. - LastTimestamp() (model.Time, error) - // Whether a given timestamp is contained between first and last value - // in the chunk. - Contains(model.Time) (bool, error) // Scans the next value in the chunk. Directly after the iterator has // been created, the next value is the first value in the // chunk. Otherwise, it is the value following the last value scanned or // found (by one of the Find... methods). Returns false if either the // end of the chunk is reached or an error has occurred. Scan() bool - // Finds the most recent value at or before the provided time. Returns - // false if either the chunk contains no value at or before the provided - // time, or an error has occurred. - FindAtOrBefore(model.Time) bool // Finds the oldest value at or after the provided time. Returns false // if either the chunk contains no value at or after the provided time, // or an error has occurred. @@ -319,7 +122,7 @@ type Iterator interface { // those methods were called. Value() model.SamplePair // Returns a batch of the provisded size; NB not idempotent! Should only be called - // once per Scan/FindAtOrBefore. + // once per Scan. Batch(size int) Batch // Returns the last error encountered. In general, an error signals data // corruption in the chunk and requires quarantining. @@ -416,6 +219,8 @@ func NewForEncoding(encoding Encoding) (Chunk, error) { return newDoubleDeltaEncodedChunk(d1, d0, true, ChunkLen), nil case Varbit: return newVarbitChunk(varbitZeroEncoding), nil + case Bigchunk: + return newBigchunk(), nil default: return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } @@ -446,17 +251,6 @@ func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingC } } -// lastTimestamp implements Iterator. -func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) { - return it.acc.timestampAtIndex(it.len - 1), it.acc.err() -} - -// contains implements Iterator. -func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) { - return !t.Before(it.acc.timestampAtIndex(0)) && - !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() -} - // scan implements Iterator. func (it *indexAccessingChunkIterator) Scan() bool { it.pos++ @@ -470,22 +264,6 @@ func (it *indexAccessingChunkIterator) Scan() bool { return it.acc.err() == nil } -// findAtOrBefore implements Iterator. -func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool { - i := sort.Search(it.len, func(i int) bool { - return it.acc.timestampAtIndex(i).After(t) - }) - if i == 0 || it.acc.err() != nil { - return false - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.acc.timestampAtIndex(i - 1), - Value: it.acc.sampleValueAtIndex(i - 1), - } - return true -} - // findAtOrAfter implements Iterator. func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool { i := sort.Search(it.len, func(i int) bool { diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go new file mode 100644 index 00000000000..b4a3acf7313 --- /dev/null +++ b/pkg/chunk/encoding/chunk_test.go @@ -0,0 +1,176 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Note: this file has tests for code in both delta.go and doubledelta.go -- +// it may make sense to split those out later, but given that the tests are +// near-identical and share a helper, this feels simpler for now. + +package encoding + +import ( + "bytes" + "fmt" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestLen(t *testing.T) { + chunks := []Chunk{} + for _, encoding := range []Encoding{Delta, DoubleDelta, Varbit} { + c, err := NewForEncoding(encoding) + if err != nil { + t.Fatal(err) + } + chunks = append(chunks, c) + } + + for _, c := range chunks { + for i := 0; i <= 10; i++ { + if c.Len() != i { + t.Errorf("chunk type %s should have %d samples, had %d", c.Encoding(), i, c.Len()) + } + + cs, _ := c.Add(model.SamplePair{ + Timestamp: model.Time(i), + Value: model.SampleValue(i), + }) + c = cs[0] + } + } +} + +var step = int(15 * time.Second / time.Millisecond) + +func TestChunk(t *testing.T) { + for _, tc := range []struct { + encoding Encoding + maxSamples int + }{ + {DoubleDelta, 989}, + {Varbit, 2048}, + {Bigchunk, 4096}, + } { + for samples := 0; samples < tc.maxSamples; samples += tc.maxSamples / 10 { + + // DoubleDelta doesn't support zero length chunks. + if tc.encoding == DoubleDelta && samples == 0 { + continue + } + + t.Run(fmt.Sprintf("testChunkEncoding/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkEncoding(t, tc.encoding, samples) + }) + + t.Run(fmt.Sprintf("testChunkSeek/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkSeek(t, tc.encoding, samples) + }) + + t.Run(fmt.Sprintf("testChunkBatch/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkBatch(t, tc.encoding, samples) + }) + } + } +} + +func mkChunk(t *testing.T, encoding Encoding, samples int) Chunk { + chunk, err := NewForEncoding(encoding) + require.NoError(t, err) + + for i := 0; i < samples; i++ { + chunks, err := chunk.Add(model.SamplePair{ + Timestamp: model.Time(i * step), + Value: model.SampleValue(i), + }) + require.NoError(t, err) + require.Len(t, chunks, 1) + chunk = chunks[0] + } + + return chunk +} + +// testChunkEncoding checks chunks roundtrip and contain all their samples. +func testChunkEncoding(t *testing.T, encoding Encoding, samples int) { + chunk := mkChunk(t, encoding, samples) + + var buf bytes.Buffer + err := chunk.Marshal(&buf) + require.NoError(t, err) + + bs1 := buf.Bytes() + chunk, err = NewForEncoding(encoding) + err = chunk.Unmarshal(&buf) + require.NoError(t, err) + + // Check all the samples are in there. + iter := chunk.NewIterator() + for i := 0; i < samples; i++ { + require.True(t, iter.Scan()) + sample := iter.Value() + require.EqualValues(t, model.Time(i*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(i), sample.Value) + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) + + // Check the byte representation after another Marshall is the same. + err = chunk.Marshal(&buf) + require.NoError(t, err) + bs2 := buf.Bytes() + + require.True(t, bytes.Equal(bs1, bs2)) +} + +// testChunkSeek checks seek works as expected. +func testChunkSeek(t *testing.T, encoding Encoding, samples int) { + chunk := mkChunk(t, encoding, samples) + + iter := chunk.NewIterator() + for i := 0; i < samples; i += samples / 10 { + require.True(t, iter.FindAtOrAfter(model.Time(i*step))) + sample := iter.Value() + require.EqualValues(t, model.Time(i*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(i), sample.Value) + + j := i + 1 + for ; j < samples; j++ { + require.True(t, iter.Scan()) + sample := iter.Value() + require.EqualValues(t, model.Time(j*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(j), sample.Value) + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) + } +} + +func testChunkBatch(t *testing.T, encoding Encoding, samples int) { + chunk := mkChunk(t, encoding, samples) + + // Check all the samples are in there. + iter := chunk.NewIterator() + for i := 0; i < samples; { + require.True(t, iter.Scan()) + batch := iter.Batch(BatchSize) + for j := 0; j < batch.Length; j++ { + require.EqualValues(t, int64((i+j)*step), batch.Timestamps[j]) + require.EqualValues(t, float64(i+j), batch.Values[j]) + } + i += batch.Length + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) +} diff --git a/pkg/prom1/storage/local/chunk/delta.go b/pkg/chunk/encoding/delta.go similarity index 94% rename from pkg/prom1/storage/local/chunk/delta.go rename to pkg/chunk/encoding/delta.go index 0fa2f917bc1..dfe4659d1d5 100644 --- a/pkg/prom1/storage/local/chunk/delta.go +++ b/pkg/chunk/encoding/delta.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "encoding/binary" @@ -180,16 +180,8 @@ func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { return []Chunk{&c}, nil } -// Clone implements chunk. -func (c deltaEncodedChunk) Clone() Chunk { - clone := make(deltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - -// FirstTime implements chunk. -func (c deltaEncodedChunk) FirstTime() model.Time { - return c.baseTime() +func (c *deltaEncodedChunk) Slice(_, _ model.Time) Chunk { + return c } // NewIterator implements chunk. @@ -221,20 +213,6 @@ func (c deltaEncodedChunk) Marshal(w io.Writer) error { return nil } -// MarshalToBuf implements chunk. -func (c deltaEncodedChunk) MarshalToBuf(buf []byte) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint") - } - binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) - - n := copy(buf, c) - if n != len(c) { - return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) - } - return nil -} - // Unmarshal implements chunk. func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] @@ -316,6 +294,10 @@ func (c deltaEncodedChunk) Len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } +func (c deltaEncodedChunk) Size() int { + return len(c) +} + // deltaEncodedIndexAccessor implements indexAccessor. type deltaEncodedIndexAccessor struct { c deltaEncodedChunk diff --git a/pkg/prom1/storage/local/chunk/delta_helpers.go b/pkg/chunk/encoding/delta_helpers.go similarity index 99% rename from pkg/prom1/storage/local/chunk/delta_helpers.go rename to pkg/chunk/encoding/delta_helpers.go index 7b64db53f06..c34ab8555a3 100644 --- a/pkg/prom1/storage/local/chunk/delta_helpers.go +++ b/pkg/chunk/encoding/delta_helpers.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "math" diff --git a/pkg/prom1/storage/local/chunk/delta_test.go b/pkg/chunk/encoding/delta_test.go similarity index 97% rename from pkg/prom1/storage/local/chunk/delta_test.go rename to pkg/chunk/encoding/delta_test.go index 93b7f9695b4..01700861dc3 100644 --- a/pkg/prom1/storage/local/chunk/delta_test.go +++ b/pkg/chunk/encoding/delta_test.go @@ -18,7 +18,7 @@ // it may make sense to split those out later, but given that the tests are // near-identical and share a helper, this feels simpler for now. -package chunk +package encoding import ( "bytes" @@ -90,11 +90,11 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { t.Fatalf("Couldn't add sample to empty %s: %s", c.chunkTypeName, err) } - buf := make([]byte, ChunkLen) - - cs[0].MarshalToBuf(buf) + var writer bytes.Buffer + cs[0].Marshal(&writer) // Corrupt time byte to 0, which is illegal. + buf := writer.Bytes() buf[c.timeBytesPos] = 0 err = cs[0].UnmarshalFromBuf(buf) verifyUnmarshallingError(err, c.chunkTypeName, "buf", "invalid number of time bytes") diff --git a/pkg/prom1/storage/local/chunk/doubledelta.go b/pkg/chunk/encoding/doubledelta.go similarity index 98% rename from pkg/prom1/storage/local/chunk/doubledelta.go rename to pkg/chunk/encoding/doubledelta.go index 3c6537f63ac..59d9f95da2b 100644 --- a/pkg/prom1/storage/local/chunk/doubledelta.go +++ b/pkg/chunk/encoding/doubledelta.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "encoding/binary" @@ -187,13 +187,6 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { return []Chunk{&c}, nil } -// Clone implements chunk. -func (c doubleDeltaEncodedChunk) Clone() Chunk { - clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - // FirstTime implements chunk. func (c doubleDeltaEncodedChunk) FirstTime() model.Time { return c.baseTime() @@ -213,6 +206,10 @@ func (c *doubleDeltaEncodedChunk) NewIterator() Iterator { }) } +func (c *doubleDeltaEncodedChunk) Slice(_, _ model.Time) Chunk { + return c +} + // Marshal implements chunk. func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error { if len(c) > math.MaxUint16 { @@ -358,6 +355,10 @@ func (c doubleDeltaEncodedChunk) Len() int { return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2 } +func (c doubleDeltaEncodedChunk) Size() int { + return len(c) +} + func (c doubleDeltaEncodedChunk) isInt() bool { return c[doubleDeltaHeaderIsIntOffset] == 1 } diff --git a/pkg/prom1/storage/local/chunk/instrumentation.go b/pkg/chunk/encoding/instrumentation.go similarity index 97% rename from pkg/prom1/storage/local/chunk/instrumentation.go rename to pkg/chunk/encoding/instrumentation.go index c73fe991089..241b88c48bc 100644 --- a/pkg/prom1/storage/local/chunk/instrumentation.go +++ b/pkg/chunk/encoding/instrumentation.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import "github.com/prometheus/client_golang/prometheus" @@ -65,8 +65,6 @@ const ( Pin = "pin" // Unpin is the label value for unpin chunk ops (excludes the unpin on persisting). Unpin = "unpin" - // Clone is the label value for clone chunk ops. - Clone = "clone" // Transcode is the label value for transcode chunk ops. Transcode = "transcode" // Drop is the label value for drop chunk ops. diff --git a/pkg/prom1/storage/local/chunk/varbit.go b/pkg/chunk/encoding/varbit.go similarity index 95% rename from pkg/prom1/storage/local/chunk/varbit.go rename to pkg/chunk/encoding/varbit.go index 56e2690821a..91fc3eaad47 100644 --- a/pkg/prom1/storage/local/chunk/varbit.go +++ b/pkg/chunk/encoding/varbit.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "encoding/binary" @@ -275,18 +275,15 @@ func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) { return c.addLaterSample(s, offset) } -// Clone implements chunk. -func (c varbitChunk) Clone() Chunk { - clone := make(varbitChunk, len(c)) - copy(clone, c) - return &clone -} - // NewIterator implements chunk. func (c varbitChunk) NewIterator() Iterator { return newVarbitChunkIterator(c) } +func (c *varbitChunk) Slice(_, _ model.Time) Chunk { + return c +} + // Marshal implements chunk. func (c varbitChunk) Marshal(w io.Writer) error { n, err := w.Write(c) @@ -299,15 +296,6 @@ func (c varbitChunk) Marshal(w io.Writer) error { return nil } -// MarshalToBuf implements chunk. -func (c varbitChunk) MarshalToBuf(buf []byte) error { - n := copy(buf, c) - if n != len(c) { - return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) - } - return nil -} - // Unmarshal implements chunk. func (c varbitChunk) Unmarshal(r io.Reader) error { _, err := io.ReadFull(r, c) @@ -340,8 +328,11 @@ func (c varbitChunk) Len() int { return i } -// FirstTime implements chunk. -func (c varbitChunk) FirstTime() model.Time { +func (c varbitChunk) Size() int { + return len(c) +} + +func (c varbitChunk) firstTime() model.Time { return model.Time( binary.BigEndian.Uint64( c[varbitFirstTimeOffset:], @@ -508,7 +499,7 @@ func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk { // first time delta from the provided sample and adds it to the chunk together // with the provided sample as the last sample. func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { - firstTimeDelta := s.Timestamp - c.FirstTime() + firstTimeDelta := s.Timestamp - c.firstTime() if firstTimeDelta < 0 { return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) } @@ -921,26 +912,6 @@ func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator { } } -// lastTimestamp implements Iterator. -func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) { - if it.len == varbitFirstSampleBitOffset { - // No samples in the chunk yet. - return model.Earliest, it.lastError - } - return it.c.lastTime(), it.lastError -} - -// contains implements Iterator. -func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) { - last, err := it.LastTimestamp() - if err != nil { - it.lastError = err - return false, err - } - return !t.Before(it.c.FirstTime()) && - !t.After(last), it.lastError -} - // scan implements Iterator. func (it *varbitChunkIterator) Scan() bool { if it.lastError != nil { @@ -965,7 +936,7 @@ func (it *varbitChunkIterator) Scan() bool { return it.lastError == nil } if it.pos == varbitFirstSampleBitOffset { - it.t = it.c.FirstTime() + it.t = it.c.firstTime() it.v = it.c.firstValue() it.pos = varbitSecondSampleBitOffset return it.lastError == nil @@ -1020,48 +991,12 @@ func (it *varbitChunkIterator) Scan() bool { return it.lastError == nil } -// findAtOrBefore implements Iterator. -func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool { - if it.len == 0 || t.Before(it.c.FirstTime()) { - return false - } - last := it.c.lastTime() - if !t.Before(last) { - it.t = last - it.v = it.c.lastValue() - it.pos = it.len + 1 - return true - } - if t == it.t { - return it.lastError == nil - } - if t.Before(it.t) || it.rewound { - it.reset() - } - - var ( - prevT = model.Earliest - prevV model.SampleValue - ) - for it.Scan() && !t.Before(it.t) { - prevT = it.t - prevV = it.v - // TODO(beorn7): If we are in a repeat, we could iterate forward - // much faster. - } - if t == it.t { - return it.lastError == nil - } - it.rewind(prevT, prevV) - return it.lastError == nil -} - // findAtOrAfter implements Iterator. func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool { if it.len == 0 || t.After(it.c.lastTime()) { return false } - first := it.c.FirstTime() + first := it.c.firstTime() if !t.After(first) { it.reset() return it.Scan() diff --git a/pkg/prom1/storage/local/chunk/varbit_helpers.go b/pkg/chunk/encoding/varbit_helpers.go similarity index 99% rename from pkg/prom1/storage/local/chunk/varbit_helpers.go rename to pkg/chunk/encoding/varbit_helpers.go index 750979404f4..9ca639e9421 100644 --- a/pkg/prom1/storage/local/chunk/varbit_helpers.go +++ b/pkg/chunk/encoding/varbit_helpers.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import "github.com/prometheus/common/model" diff --git a/pkg/prom1/storage/local/chunk/varbit_test.go b/pkg/chunk/encoding/varbit_test.go similarity index 98% rename from pkg/prom1/storage/local/chunk/varbit_test.go rename to pkg/chunk/encoding/varbit_test.go index c0c1084878e..d5f0cd7eeaf 100644 --- a/pkg/prom1/storage/local/chunk/varbit_test.go +++ b/pkg/chunk/encoding/varbit_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import "testing" diff --git a/pkg/chunk/testutils/testutils.go b/pkg/chunk/testutils/testutils.go index 8d0f0e102b3..bcbe7b07084 100644 --- a/pkg/chunk/testutils/testutils.go +++ b/pkg/chunk/testutils/testutils.go @@ -5,7 +5,7 @@ import ( "strconv" "time" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" "github.com/cortexproject/cortex/pkg/chunk" diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6d3a1df872b..bdb09052502 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1,6 +1,7 @@ package distributor import ( + "bytes" "fmt" "io" "net/http" @@ -19,8 +20,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" @@ -469,7 +470,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest continue } - c := chunk.New() + c := encoding.New() for _, sample := range ts.Samples { cs, err := c.Add(model.SamplePair{ Timestamp: model.Time(sample.TimestampMs), @@ -481,13 +482,14 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest c = cs[0] } + var buf bytes.Buffer chunk := client.Chunk{ Encoding: int32(c.Encoding()), - Data: make([]byte, chunk.ChunkLen, chunk.ChunkLen), } - if err := c.MarshalToBuf(chunk.Data); err != nil { + if err := c.Marshal(&buf); err != nil { panic(err) } + chunk.Data = buf.Bytes() results = append(results, &client.QueryStreamResponse{ Timeseries: []client.TimeSeriesChunk{ diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index acad6b6a02e..2971d1fda4f 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -35,6 +35,11 @@ var ( Help: "Distribution of stored chunk lengths (when stored).", Buckets: prometheus.ExponentialBuckets(5, 2, 11), // biggest bucket is 5*2^(11-1) = 5120 }) + chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingester_chunk_size_bytes", + Help: "Distribution of stored chunk sizes (when stored).", + Buckets: prometheus.ExponentialBuckets(10, 10, 5), // biggest bucket is 5*2^(11-1) = 5120 + }) chunkAge = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_chunk_age_seconds", Help: "Distribution of chunk ages (when stored).", @@ -46,6 +51,10 @@ var ( Name: "cortex_ingester_memory_chunks", Help: "The total number of chunks in memory.", }) + flushReasons = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_flush_reasons", + Help: "Total number of series scheduled for flushing, with reasons.", + }, []string{"reason"}) ) // Flush triggers a flush of all the chunks and closes the flush queues. @@ -109,6 +118,23 @@ const ( reasonIdle ) +func (f flushReason) String() string { + switch f { + case noFlush: + return "NoFlush" + case reasonImmediate: + return "Immediate" + case reasonMultipleChunksInSeries: + return "MultipleChunksInSeries" + case reasonAged: + return "Aged" + case reasonIdle: + return "Idle" + default: + panic("unrecognised flushReason") + } +} + // sweepSeries schedules a series for flushing based on a set of criteria // // NB we don't close the head chunk here, as the series could wait in the queue @@ -120,6 +146,7 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo firstTime := series.firstTime() flush := i.shouldFlushSeries(series, fp, immediate) + flushReasons.WithLabelValues(flush.String()).Inc() if flush != noFlush { flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) @@ -290,10 +317,11 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric // Record statistsics only when actual put request did not return error. for _, chunkDesc := range chunkDescs { - utilization, length := chunkDesc.C.Utilization(), chunkDesc.C.Len() - util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "utilization", utilization, "length", length, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime) + utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size() + util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime) chunkUtilization.Observe(utilization) chunkLength.Observe(float64(length)) + chunkSize.Observe(float64(size)) chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 00548133180..02be577082b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -12,7 +12,7 @@ import ( old_ctx "golang.org/x/net/context" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -156,7 +156,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c cfg.ingesterClientFactory = client.MakeIngesterClient } - if err := chunk.DefaultEncoding.Set(cfg.ChunkEncoding); err != nil { + if err := encoding.DefaultEncoding.Set(cfg.ChunkEncoding); err != nil { return nil, err } @@ -366,7 +366,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client // QueryStream implements service.IngesterServer func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { - _, _, matchers, err := client.FromQueryRequest(req) + from, through, matchers, err := client.FromQueryRequest(req) if err != nil { return err } @@ -392,8 +392,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ numSeries++ chunks := make([]*desc, 0, len(series.chunkDescs)) for _, chunk := range series.chunkDescs { - if !(chunk.FirstTime.After(model.Time(req.EndTimestampMs)) || chunk.LastTime.Before(model.Time(req.StartTimestampMs))) { - chunks = append(chunks, chunk) + if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) { + chunks = append(chunks, chunk.slice(from, through)) } } diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index baece97be31..f7958a23e50 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -7,7 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" ) @@ -90,7 +90,7 @@ func (s *memorySeries) add(v model.SamplePair) error { } if len(s.chunkDescs) == 0 || s.headChunkClosed { - newHead := newDesc(chunk.New(), v.Timestamp, v.Timestamp) + newHead := newDesc(encoding.New(), v.Timestamp, v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkClosed = false createdChunks.Inc() @@ -109,11 +109,11 @@ func (s *memorySeries) add(v model.SamplePair) error { } else { s.chunkDescs = s.chunkDescs[:len(s.chunkDescs)-1] for _, c := range chunks { - lastTime, err := c.NewIterator().LastTimestamp() + first, last, err := firstAndLastTimes(c) if err != nil { return err } - s.chunkDescs = append(s.chunkDescs, newDesc(c, c.FirstTime(), lastTime)) + s.chunkDescs = append(s.chunkDescs, newDesc(c, first, last)) createdChunks.Inc() } } @@ -125,6 +125,24 @@ func (s *memorySeries) add(v model.SamplePair) error { return nil } +func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) { + var ( + first model.Time + last model.Time + firstSet bool + iter = c.NewIterator() + ) + for iter.Scan() { + sample := iter.Value() + if !firstSet { + first = sample.Timestamp + firstSet = true + } + last = sample.Timestamp + } + return first, last, iter.Err() +} + func (s *memorySeries) closeHead() { s.headChunkClosed = true } @@ -173,7 +191,7 @@ func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.Sample } for idx := fromIdx; idx <= throughIdx; idx++ { cd := s.chunkDescs[idx] - chValues, err := chunk.RangeValues(cd.C.NewIterator(), in) + chValues, err := encoding.RangeValues(cd.C.NewIterator(), in) if err != nil { return nil, err } @@ -195,14 +213,14 @@ func (s *memorySeries) setChunks(descs []*desc) error { } type desc struct { - C chunk.Chunk // nil if chunk is evicted. - FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. - LastTime model.Time // Timestamp of last sample. Populated at creation & on append. - LastUpdate model.Time // This server's local time on last change - flushed bool // set to true when flush succeeds + C encoding.Chunk // nil if chunk is evicted. + FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. + LastTime model.Time // Timestamp of last sample. Populated at creation & on append. + LastUpdate model.Time // This server's local time on last change + flushed bool // set to true when flush succeeds } -func newDesc(c chunk.Chunk, firstTime model.Time, lastTime model.Time) *desc { +func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc { return &desc{ C: c, FirstTime: firstTime, @@ -214,7 +232,7 @@ func newDesc(c chunk.Chunk, firstTime model.Time, lastTime model.Time) *desc { // Add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (d *desc) add(s model.SamplePair) ([]chunk.Chunk, error) { +func (d *desc) add(s model.SamplePair) ([]encoding.Chunk, error) { cs, err := d.C.Add(s) if err != nil { return nil, err @@ -227,3 +245,11 @@ func (d *desc) add(s model.SamplePair) ([]chunk.Chunk, error) { return cs, nil } + +func (d *desc) slice(start, end model.Time) *desc { + return &desc{ + C: d.C.Slice(start, end), + FirstTime: start, + LastTime: end, + } +} diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 0f01bef2eed..ae241aba74b 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -1,6 +1,7 @@ package ingester import ( + "bytes" "context" "fmt" "io" @@ -11,8 +12,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/weaveworks/common/user" @@ -146,13 +147,14 @@ func toWireChunks(descs []*desc) ([]client.Chunk, error) { StartTimestampMs: int64(d.FirstTime), EndTimestampMs: int64(d.LastTime), Encoding: int32(d.C.Encoding()), - Data: make([]byte, chunk.ChunkLen, chunk.ChunkLen), } - if err := d.C.MarshalToBuf(wireChunk.Data); err != nil { + buf := bytes.NewBuffer(make([]byte, 0, encoding.ChunkLen)) + if err := d.C.Marshal(buf); err != nil { return nil, err } + wireChunk.Data = buf.Bytes() wireChunks = append(wireChunks, wireChunk) } return wireChunks, nil @@ -168,7 +170,7 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { } var err error - desc.C, err = chunk.NewForEncoding(chunk.Encoding(byte(c.Encoding))) + desc.C, err = encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding))) if err != nil { return nil, err } diff --git a/pkg/prom1/storage/local/chunk/chunk_test.go b/pkg/prom1/storage/local/chunk/chunk_test.go deleted file mode 100644 index 20ac73c7e67..00000000000 --- a/pkg/prom1/storage/local/chunk/chunk_test.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -// Note: this file has tests for code in both delta.go and doubledelta.go -- -// it may make sense to split those out later, but given that the tests are -// near-identical and share a helper, this feels simpler for now. - -package chunk - -import ( - "testing" - - "github.com/prometheus/common/model" -) - -func TestLen(t *testing.T) { - chunks := []Chunk{} - for _, encoding := range []Encoding{Delta, DoubleDelta, Varbit} { - c, err := NewForEncoding(encoding) - if err != nil { - t.Fatal(err) - } - chunks = append(chunks, c) - } - - for _, c := range chunks { - for i := 0; i <= 10; i++ { - if c.Len() != i { - t.Errorf("chunk type %s should have %d samples, had %d", c.Encoding(), i, c.Len()) - } - - cs, _ := c.Add(model.SamplePair{ - Timestamp: model.Time(i), - Value: model.SampleValue(i), - }) - c = cs[0] - } - } -} diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index bfc4444ae2f..480b869ddcd 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" ) diff --git a/pkg/querier/batch/chunk.go b/pkg/querier/batch/chunk.go index 2ae3ab955b7..007f2cf60aa 100644 --- a/pkg/querier/batch/chunk.go +++ b/pkg/querier/batch/chunk.go @@ -2,7 +2,7 @@ package batch import ( "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 45beb51ef20..3bdcbebb03f 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) const ( @@ -20,18 +20,30 @@ const ( ) func TestChunkIter(t *testing.T) { - chunk := mkChunk(t, 0, 100) - iter := &chunkIterator{} - iter.reset(chunk) - testIter(t, 100, newIteratorAdapter(iter)) - testSeek(t, 100, newIteratorAdapter(iter)) + forEncodings(t, func(t *testing.T, enc promchunk.Encoding) { + chunk := mkChunk(t, 0, 100, enc) + iter := &chunkIterator{} + iter.reset(chunk) + testIter(t, 100, newIteratorAdapter(iter)) + testSeek(t, 100, newIteratorAdapter(iter)) + }) } -func mkChunk(t require.TestingT, from model.Time, points int) chunk.Chunk { +func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) { + for _, enc := range []promchunk.Encoding{ + promchunk.DoubleDelta, promchunk.Varbit, promchunk.Bigchunk, + } { + t.Run(enc.String(), func(t *testing.T) { + f(t, enc) + }) + } +} + +func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk { metric := model.Metric{ model.MetricNameLabel: "foo", } - pc, err := promchunk.NewForEncoding(promchunk.DoubleDelta) + pc, err := promchunk.NewForEncoding(enc) require.NoError(t, err) ts := from for i := 0; i < points; i++ { diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 7a3637b6c59..6823de5c170 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -5,7 +5,7 @@ import ( "sort" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) type mergeIterator struct { diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index dc47a10c2bf..0595cf3100d 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -5,33 +5,38 @@ import ( "time" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) func TestMergeIter(t *testing.T) { - chunk1 := mkChunk(t, 0, 100) - chunk2 := mkChunk(t, model.TimeFromUnix(25), 100) - chunk3 := mkChunk(t, model.TimeFromUnix(50), 100) - chunk4 := mkChunk(t, model.TimeFromUnix(75), 100) - chunk5 := mkChunk(t, model.TimeFromUnix(100), 100) - iter := newMergeIterator([]chunk.Chunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testIter(t, 200, newIteratorAdapter(iter)) - testSeek(t, 200, newIteratorAdapter(iter)) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + chunk1 := mkChunk(t, 0, 100, enc) + chunk2 := mkChunk(t, model.TimeFromUnix(25), 100, enc) + chunk3 := mkChunk(t, model.TimeFromUnix(50), 100, enc) + chunk4 := mkChunk(t, model.TimeFromUnix(75), 100, enc) + chunk5 := mkChunk(t, model.TimeFromUnix(100), 100, enc) + iter := newMergeIterator([]chunk.Chunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + testIter(t, 200, newIteratorAdapter(iter)) + testSeek(t, 200, newIteratorAdapter(iter)) + }) } func TestMergeHarder(t *testing.T) { - var ( - numChunks = 24 * 15 - chunks = make([]chunk.Chunk, 0) - from = model.Time(0) - offset = 30 - samples = 100 - ) - for i := 0; i < numChunks; i++ { - chunks = append(chunks, mkChunk(t, from, samples)) - from = from.Add(time.Duration(offset) * time.Second) - } - iter := newMergeIterator(chunks) - testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) - testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + var ( + numChunks = 24 * 15 + chunks = make([]chunk.Chunk, 0) + from = model.Time(0) + offset = 30 + samples = 100 + ) + for i := 0; i < numChunks; i++ { + chunks = append(chunks, mkChunk(t, from, samples, enc)) + from = from.Add(time.Duration(offset) * time.Second) + } + iter := newMergeIterator(chunks) + testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + }) } diff --git a/pkg/querier/batch/non_overlapping.go b/pkg/querier/batch/non_overlapping.go index 2a2eb8e0e16..075ce7b049c 100644 --- a/pkg/querier/batch/non_overlapping.go +++ b/pkg/querier/batch/non_overlapping.go @@ -2,7 +2,7 @@ package batch import ( "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) const bufferBatches = 4 diff --git a/pkg/querier/batch/non_overlapping_test.go b/pkg/querier/batch/non_overlapping_test.go index 297a35ce011..0af38c68eb6 100644 --- a/pkg/querier/batch/non_overlapping_test.go +++ b/pkg/querier/batch/non_overlapping_test.go @@ -4,27 +4,32 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) func TestNonOverlappingIter(t *testing.T) { - cs := []chunk.Chunk{} - for i := int64(0); i < 100; i++ { - cs = append(cs, mkChunk(t, model.TimeFromUnix(i*10), 10)) - } - testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + cs := []chunk.Chunk{} + for i := int64(0); i < 100; i++ { + cs = append(cs, mkChunk(t, model.TimeFromUnix(i*10), 10, enc)) + } + testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + }) } func TestNonOverlappingIterSparse(t *testing.T) { - cs := []chunk.Chunk{ - mkChunk(t, model.TimeFromUnix(0), 1), - mkChunk(t, model.TimeFromUnix(1), 3), - mkChunk(t, model.TimeFromUnix(4), 1), - mkChunk(t, model.TimeFromUnix(5), 90), - mkChunk(t, model.TimeFromUnix(95), 1), - mkChunk(t, model.TimeFromUnix(96), 4), - } - testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + cs := []chunk.Chunk{ + mkChunk(t, model.TimeFromUnix(0), 1, enc), + mkChunk(t, model.TimeFromUnix(1), 3, enc), + mkChunk(t, model.TimeFromUnix(4), 1, enc), + mkChunk(t, model.TimeFromUnix(5), 90, enc), + mkChunk(t, model.TimeFromUnix(95), 1, enc), + mkChunk(t, model.TimeFromUnix(96), 4, enc), + } + testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + }) } diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 728544c661b..3a1946aefb5 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -3,7 +3,7 @@ package batch import ( "fmt" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) // batchStream deals with iteratoring through multiple, non-overlapping batches, diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index dc582c8ed89..ce379e2e160 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -4,7 +4,7 @@ import ( "strconv" "testing" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/stretchr/testify/require" ) diff --git a/pkg/querier/chunk_store_queryable_test.go b/pkg/querier/chunk_store_queryable_test.go index 243656ba618..e09a0ecf1db 100644 --- a/pkg/querier/chunk_store_queryable_test.go +++ b/pkg/querier/chunk_store_queryable_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) func TestChunkQueryable(t *testing.T) { diff --git a/pkg/querier/iterators/chunk_iterator.go b/pkg/querier/iterators/chunk_iterator.go index 8bfdfcbda4d..6971487375a 100644 --- a/pkg/querier/iterators/chunk_iterator.go +++ b/pkg/querier/iterators/chunk_iterator.go @@ -2,7 +2,7 @@ package iterators import ( "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) diff --git a/pkg/querier/iterators/chunk_merge_iterator_test.go b/pkg/querier/iterators/chunk_merge_iterator_test.go index f23684f19d7..074d1ba8237 100644 --- a/pkg/querier/iterators/chunk_merge_iterator_test.go +++ b/pkg/querier/iterators/chunk_merge_iterator_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) const ( diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 72f629ff1ec..8369fd572f0 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/iterators" "github.com/cortexproject/cortex/pkg/util" @@ -57,6 +57,7 @@ var ( }{ {"DoubleDelta", promchunk.DoubleDelta}, {"Varbit", promchunk.Varbit}, + {"Bigchunk", promchunk.Bigchunk}, } queries = []query{ diff --git a/pkg/util/chunkcompat/compat.go b/pkg/util/chunkcompat/compat.go index 4ca310f7f40..446f7a4544d 100644 --- a/pkg/util/chunkcompat/compat.go +++ b/pkg/util/chunkcompat/compat.go @@ -1,11 +1,14 @@ package chunkcompat import ( + "bytes" + + "github.com/prometheus/common/model" + "github.com/cortexproject/cortex/pkg/chunk" + prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - prom_chunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/util" - "github.com/prometheus/common/model" ) // StreamsToMatrix converts a slice of QueryStreamResponse to a model.Matrix. @@ -82,13 +85,14 @@ func ToChunks(in []chunk.Chunk) ([]client.Chunk, error) { StartTimestampMs: int64(i.From), EndTimestampMs: int64(i.Through), Encoding: int32(i.Data.Encoding()), - Data: make([]byte, prom_chunk.ChunkLen, prom_chunk.ChunkLen), } - if err := i.Data.MarshalToBuf(wireChunk.Data); err != nil { + buf := bytes.NewBuffer(make([]byte, 0, prom_chunk.ChunkLen)) + if err := i.Data.Marshal(buf); err != nil { return nil, err } + wireChunk.Data = buf.Bytes() out = append(out, wireChunk) } return out, nil