From fd6d8e5d62677f225ee5128fe093f510fcbf98b7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 1 Oct 2018 21:18:11 +0100 Subject: [PATCH 01/11] Remove unused code from the chunk package. Also, remove assumptions about marshalled chunk length. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk.go | 10 +- pkg/chunk/chunk_test.go | 18 +- pkg/distributor/distributor_test.go | 6 +- pkg/ingester/series.go | 22 +- pkg/ingester/transfer.go | 6 +- pkg/prom1/storage/local/chunk/chunk.go | 237 +----------------- pkg/prom1/storage/local/chunk/delta.go | 26 -- pkg/prom1/storage/local/chunk/delta_test.go | 6 +- pkg/prom1/storage/local/chunk/doubledelta.go | 7 - .../storage/local/chunk/instrumentation.go | 2 - pkg/prom1/storage/local/chunk/varbit.go | 81 +----- pkg/util/chunkcompat/compat.go | 10 +- 12 files changed, 65 insertions(+), 366 deletions(-) diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 0b5cb30e7c4..eb926358612 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -213,12 +213,12 @@ func (c *Chunk) Encode() ([]byte, error) { // Write the metadata length back at the start of the buffer. // (note this length includes the 4 bytes for the length itself) - binary.BigEndian.PutUint32(metadataLenBytes[:], uint32(buf.Len())) + metadataLen := buf.Len() + binary.BigEndian.PutUint32(metadataLenBytes[:], uint32(metadataLen)) copy(buf.Bytes(), metadataLenBytes[:]) - // Write the data length + // Write another 4 empty bytes - we will come back and put the len in here. dataLenBytes := [4]byte{} - binary.BigEndian.PutUint32(dataLenBytes[:], uint32(prom_chunk.ChunkLen)) if _, err := buf.Write(dataLenBytes[:]); err != nil { return nil, err } @@ -228,6 +228,10 @@ func (c *Chunk) Encode() ([]byte, error) { return nil, err } + // Now write the data len back into the buf. + binary.BigEndian.PutUint32(dataLenBytes[:], uint32(buf.Len()-metadataLen-4)) + copy(buf.Bytes()[metadataLen:], dataLenBytes[:]) + // Now work out the checksum c.encoded = buf.Bytes() c.ChecksumSet = true diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index 66d252ec617..7882eb0fe17 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -24,13 +24,21 @@ func dummyChunk(now model.Time) Chunk { }) } -func dummyChunkFor(now model.Time, metric model.Metric) Chunk { - cs, _ := chunk.New().Add(model.SamplePair{Timestamp: now, Value: 0}) +func dummyChunkForEncoding(now model.Time, metric model.Metric, encoding chunk.Encoding, samples int) Chunk { + c, _ := chunk.NewForEncoding(encoding) + for i := 0; i < samples; i++ { + t := time.Duration(i) * 15 * time.Second + cs, err := c.Add(model.SamplePair{Timestamp: now.Add(t), Value: 0}) + if err != nil { + panic(err) + } + c = cs[0] + } chunk := NewChunk( userID, metric.Fingerprint(), metric, - cs[0], + c, now.Add(-time.Hour), now, ) @@ -42,6 +50,10 @@ func dummyChunkFor(now model.Time, metric model.Metric) Chunk { return chunk } +func dummyChunkFor(now model.Time, metric model.Metric) Chunk { + return dummyChunkForEncoding(now, metric, chunk.Varbit, 1) +} + func TestChunkCodec(t *testing.T) { dummy := dummyChunk(model.Now()) decodeContext := NewDecodeContext() diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6d3a1df872b..6f8cb951d4f 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1,6 +1,7 @@ package distributor import ( + "bytes" "fmt" "io" "net/http" @@ -481,13 +482,14 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest c = cs[0] } + var buf bytes.Buffer chunk := client.Chunk{ Encoding: int32(c.Encoding()), - Data: make([]byte, chunk.ChunkLen, chunk.ChunkLen), } - if err := c.MarshalToBuf(chunk.Data); err != nil { + if err := c.Marshal(&buf); err != nil { panic(err) } + chunk.Data = buf.Bytes() results = append(results, &client.QueryStreamResponse{ Timeseries: []client.TimeSeriesChunk{ diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index baece97be31..ba399d78bab 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -109,11 +109,11 @@ func (s *memorySeries) add(v model.SamplePair) error { } else { s.chunkDescs = s.chunkDescs[:len(s.chunkDescs)-1] for _, c := range chunks { - lastTime, err := c.NewIterator().LastTimestamp() + first, last, err := firstAndLastTimes(c) if err != nil { return err } - s.chunkDescs = append(s.chunkDescs, newDesc(c, c.FirstTime(), lastTime)) + s.chunkDescs = append(s.chunkDescs, newDesc(c, first, last)) createdChunks.Inc() } } @@ -125,6 +125,24 @@ func (s *memorySeries) add(v model.SamplePair) error { return nil } +func firstAndLastTimes(c chunk.Chunk) (model.Time, model.Time, error) { + var ( + first model.Time + last model.Time + firstSet bool + iter = c.NewIterator() + ) + for iter.Scan() { + sample := iter.Value() + if !firstSet { + first = sample.Timestamp + firstSet = true + } + last = sample.Timestamp + } + return first, last, iter.Err() +} + func (s *memorySeries) closeHead() { s.headChunkClosed = true } diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 0f01bef2eed..d245e62e073 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -1,6 +1,7 @@ package ingester import ( + "bytes" "context" "fmt" "io" @@ -146,13 +147,14 @@ func toWireChunks(descs []*desc) ([]client.Chunk, error) { StartTimestampMs: int64(d.FirstTime), EndTimestampMs: int64(d.LastTime), Encoding: int32(d.C.Encoding()), - Data: make([]byte, chunk.ChunkLen, chunk.ChunkLen), } - if err := d.C.MarshalToBuf(wireChunk.Data); err != nil { + buf := bytes.NewBuffer(make([]byte, 0, chunk.ChunkLen)) + if err := d.C.Marshal(buf); err != nil { return nil, err } + wireChunk.Data = buf.Bytes() wireChunks = append(wireChunks, wireChunk) } return wireChunks, nil diff --git a/pkg/prom1/storage/local/chunk/chunk.go b/pkg/prom1/storage/local/chunk/chunk.go index df5ef9e99f4..da80e3aa119 100644 --- a/pkg/prom1/storage/local/chunk/chunk.go +++ b/pkg/prom1/storage/local/chunk/chunk.go @@ -17,13 +17,10 @@ package chunk import ( - "container/list" "errors" "fmt" "io" "sort" - "sync" - "sync/atomic" "github.com/prometheus/common/model" @@ -41,12 +38,6 @@ var ( errAddedToEvictedChunk = errors.New("attempted to add sample to evicted chunk") ) -// EvictRequest is a request to evict a chunk from memory. -type EvictRequest struct { - Desc *Desc - Evict bool -} - // Encoding defines which encoding we are using, delta, doubledelta, or varbit type Encoding byte @@ -79,193 +70,6 @@ const ( Varbit ) -// Desc contains meta-data for a chunk. Pay special attention to the -// documented requirements for calling its methods concurrently (WRT pinning and -// locking). The doc comments spell out the requirements for each method, but -// here is an overview and general explanation: -// -// Everything that changes the pinning of the underlying chunk or deals with its -// eviction is protected by a mutex. This affects the following methods: Pin, -// Unpin, RefCount, IsEvicted, MaybeEvict. These methods can be called at any -// time without further prerequisites. -// -// Another group of methods acts on (or sets) the underlying chunk. These -// methods involve no locking. They may only be called if the caller has pinned -// the chunk (to guarantee the chunk is not evicted concurrently). Also, the -// caller must make sure nobody else will call these methods concurrently, -// either by holding the sole reference to the Desc (usually during loading -// or creation) or by locking the fingerprint of the series the Desc -// belongs to. The affected methods are: Add, MaybePopulateLastTime, SetChunk. -// -// Finally, there are the special cases FirstTime and LastTime. LastTime requires -// to have locked the fingerprint of the series but the chunk does not need to -// be pinned. That's because the ChunkLastTime field in Desc gets populated -// upon completion of the chunk (when it is still pinned, and which happens -// while the series's fingerprint is locked). Once that has happened, calling -// LastTime does not require the chunk to be loaded anymore. Before that has -// happened, the chunk is pinned anyway. The ChunkFirstTime field in Desc -// is populated upon creation of a Desc, so it is alway safe to call -// FirstTime. The FirstTime method is arguably not needed and only there for -// consistency with LastTime. -type Desc struct { - sync.Mutex // Protects pinning. - C Chunk // nil if chunk is evicted. - rCnt int - ChunkFirstTime model.Time // Populated at creation. Immutable. - ChunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. - - // EvictListElement is nil if the chunk is not in the evict list. - // EvictListElement is _not_ protected by the Desc mutex. - // It must only be touched by the evict list handler in MemorySeriesStorage. - EvictListElement *list.Element -} - -// NewDesc creates a new Desc pointing to the provided chunk. The provided chunk -// is assumed to be not persisted yet. Therefore, the refCount of the new -// Desc is 1 (preventing eviction prior to persisting). -func NewDesc(c Chunk, firstTime model.Time) *Desc { - Ops.WithLabelValues(CreateAndPin).Inc() - atomic.AddInt64(&NumMemChunks, 1) - NumMemDescs.Inc() - return &Desc{ - C: c, - rCnt: 1, - ChunkFirstTime: firstTime, - ChunkLastTime: model.Earliest, - } -} - -// Add adds a sample pair to the underlying chunk. For safe concurrent access, -// The chunk must be pinned, and the caller must have locked the fingerprint of -// the series. -func (d *Desc) Add(s model.SamplePair) ([]Chunk, error) { - if d.C == nil { - return nil, errAddedToEvictedChunk - } - return d.C.Add(s) -} - -// Pin increments the refCount by one. Upon increment from 0 to 1, this -// Desc is removed from the evict list. To enable the latter, the -// evictRequests channel has to be provided. This method can be called -// concurrently at any time. -func (d *Desc) Pin(evictRequests chan<- EvictRequest) { - d.Lock() - defer d.Unlock() - - if d.rCnt == 0 { - // Remove ourselves from the evict list. - evictRequests <- EvictRequest{d, false} - } - d.rCnt++ -} - -// Unpin decrements the refCount by one. Upon decrement from 1 to 0, this -// Desc is added to the evict list. To enable the latter, the evictRequests -// channel has to be provided. This method can be called concurrently at any -// time. -func (d *Desc) Unpin(evictRequests chan<- EvictRequest) { - d.Lock() - defer d.Unlock() - - if d.rCnt == 0 { - panic("cannot unpin already unpinned chunk") - } - d.rCnt-- - if d.rCnt == 0 { - // Add ourselves to the back of the evict list. - evictRequests <- EvictRequest{d, true} - } -} - -// RefCount returns the number of pins. This method can be called concurrently -// at any time. -func (d *Desc) RefCount() int { - d.Lock() - defer d.Unlock() - - return d.rCnt -} - -// FirstTime returns the timestamp of the first sample in the chunk. This method -// can be called concurrently at any time. It only returns the immutable -// d.ChunkFirstTime without any locking. Arguably, this method is -// useless. However, it provides consistency with the LastTime method. -func (d *Desc) FirstTime() model.Time { - return d.ChunkFirstTime -} - -// LastTime returns the timestamp of the last sample in the chunk. For safe -// concurrent access, this method requires the fingerprint of the time series to -// be locked. -func (d *Desc) LastTime() (model.Time, error) { - if d.ChunkLastTime != model.Earliest || d.C == nil { - return d.ChunkLastTime, nil - } - return d.C.NewIterator().LastTimestamp() -} - -// MaybePopulateLastTime populates the ChunkLastTime from the underlying chunk -// if it has not yet happened. Call this method directly after having added the -// last sample to a chunk or after closing a head chunk due to age. For safe -// concurrent access, the chunk must be pinned, and the caller must have locked -// the fingerprint of the series. -func (d *Desc) MaybePopulateLastTime() error { - if d.ChunkLastTime == model.Earliest && d.C != nil { - t, err := d.C.NewIterator().LastTimestamp() - if err != nil { - return err - } - d.ChunkLastTime = t - } - return nil -} - -// IsEvicted returns whether the chunk is evicted. For safe concurrent access, -// the caller must have locked the fingerprint of the series. -func (d *Desc) IsEvicted() bool { - // Locking required here because we do not want the caller to force - // pinning the chunk first, so it could be evicted while this method is - // called. - d.Lock() - defer d.Unlock() - - return d.C == nil -} - -// SetChunk sets the underlying chunk. The caller must have locked the -// fingerprint of the series and must have "pre-pinned" the chunk (i.e. first -// call Pin and then set the chunk). -func (d *Desc) SetChunk(c Chunk) { - if d.C != nil { - panic("chunk already set") - } - d.C = c -} - -// MaybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk -// is now evicted, which includes the case that the chunk was evicted even -// before this method was called. It can be called concurrently at any time. -func (d *Desc) MaybeEvict() bool { - d.Lock() - defer d.Unlock() - - if d.C == nil { - return true - } - if d.rCnt != 0 { - return false - } - if d.ChunkLastTime == model.Earliest { - // This must never happen. - panic("ChunkLastTime not populated for evicted chunk") - } - d.C = nil - Ops.WithLabelValues(Evict).Inc() - atomic.AddInt64(&NumMemChunks, -1) - return true -} - // Chunk is the interface for all chunks. Chunks are generally not // goroutine-safe. type Chunk interface { @@ -276,11 +80,8 @@ type Chunk interface { // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the original chunk. Add(sample model.SamplePair) ([]Chunk, error) - Clone() Chunk - FirstTime() model.Time NewIterator() Iterator Marshal(io.Writer) error - MarshalToBuf([]byte) error Unmarshal(io.Reader) error UnmarshalFromBuf([]byte) error Encoding() Encoding @@ -295,21 +96,12 @@ type Chunk interface { // generally not safe to use an Iterator concurrently with or after chunk // mutation. type Iterator interface { - // Gets the last timestamp in the chunk. - LastTimestamp() (model.Time, error) - // Whether a given timestamp is contained between first and last value - // in the chunk. - Contains(model.Time) (bool, error) // Scans the next value in the chunk. Directly after the iterator has // been created, the next value is the first value in the // chunk. Otherwise, it is the value following the last value scanned or // found (by one of the Find... methods). Returns false if either the // end of the chunk is reached or an error has occurred. Scan() bool - // Finds the most recent value at or before the provided time. Returns - // false if either the chunk contains no value at or before the provided - // time, or an error has occurred. - FindAtOrBefore(model.Time) bool // Finds the oldest value at or after the provided time. Returns false // if either the chunk contains no value at or after the provided time, // or an error has occurred. @@ -319,7 +111,7 @@ type Iterator interface { // those methods were called. Value() model.SamplePair // Returns a batch of the provisded size; NB not idempotent! Should only be called - // once per Scan/FindAtOrBefore. + // once per Scan. Batch(size int) Batch // Returns the last error encountered. In general, an error signals data // corruption in the chunk and requires quarantining. @@ -446,17 +238,6 @@ func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingC } } -// lastTimestamp implements Iterator. -func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) { - return it.acc.timestampAtIndex(it.len - 1), it.acc.err() -} - -// contains implements Iterator. -func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) { - return !t.Before(it.acc.timestampAtIndex(0)) && - !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() -} - // scan implements Iterator. func (it *indexAccessingChunkIterator) Scan() bool { it.pos++ @@ -470,22 +251,6 @@ func (it *indexAccessingChunkIterator) Scan() bool { return it.acc.err() == nil } -// findAtOrBefore implements Iterator. -func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool { - i := sort.Search(it.len, func(i int) bool { - return it.acc.timestampAtIndex(i).After(t) - }) - if i == 0 || it.acc.err() != nil { - return false - } - it.pos = i - 1 - it.lastValue = model.SamplePair{ - Timestamp: it.acc.timestampAtIndex(i - 1), - Value: it.acc.sampleValueAtIndex(i - 1), - } - return true -} - // findAtOrAfter implements Iterator. func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool { i := sort.Search(it.len, func(i int) bool { diff --git a/pkg/prom1/storage/local/chunk/delta.go b/pkg/prom1/storage/local/chunk/delta.go index 0fa2f917bc1..6252668b73d 100644 --- a/pkg/prom1/storage/local/chunk/delta.go +++ b/pkg/prom1/storage/local/chunk/delta.go @@ -180,18 +180,6 @@ func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { return []Chunk{&c}, nil } -// Clone implements chunk. -func (c deltaEncodedChunk) Clone() Chunk { - clone := make(deltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - -// FirstTime implements chunk. -func (c deltaEncodedChunk) FirstTime() model.Time { - return c.baseTime() -} - // NewIterator implements chunk. func (c *deltaEncodedChunk) NewIterator() Iterator { return newIndexAccessingChunkIterator(c.Len(), &deltaEncodedIndexAccessor{ @@ -221,20 +209,6 @@ func (c deltaEncodedChunk) Marshal(w io.Writer) error { return nil } -// MarshalToBuf implements chunk. -func (c deltaEncodedChunk) MarshalToBuf(buf []byte) error { - if len(c) > math.MaxUint16 { - panic("chunk buffer length would overflow a 16 bit uint") - } - binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c))) - - n := copy(buf, c) - if n != len(c) { - return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) - } - return nil -} - // Unmarshal implements chunk. func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] diff --git a/pkg/prom1/storage/local/chunk/delta_test.go b/pkg/prom1/storage/local/chunk/delta_test.go index 93b7f9695b4..0e95814d7e4 100644 --- a/pkg/prom1/storage/local/chunk/delta_test.go +++ b/pkg/prom1/storage/local/chunk/delta_test.go @@ -90,11 +90,11 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { t.Fatalf("Couldn't add sample to empty %s: %s", c.chunkTypeName, err) } - buf := make([]byte, ChunkLen) - - cs[0].MarshalToBuf(buf) + var writer bytes.Buffer + cs[0].Marshal(&writer) // Corrupt time byte to 0, which is illegal. + buf := writer.Bytes() buf[c.timeBytesPos] = 0 err = cs[0].UnmarshalFromBuf(buf) verifyUnmarshallingError(err, c.chunkTypeName, "buf", "invalid number of time bytes") diff --git a/pkg/prom1/storage/local/chunk/doubledelta.go b/pkg/prom1/storage/local/chunk/doubledelta.go index 3c6537f63ac..d171e28bf9e 100644 --- a/pkg/prom1/storage/local/chunk/doubledelta.go +++ b/pkg/prom1/storage/local/chunk/doubledelta.go @@ -187,13 +187,6 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { return []Chunk{&c}, nil } -// Clone implements chunk. -func (c doubleDeltaEncodedChunk) Clone() Chunk { - clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) - copy(clone, c) - return &clone -} - // FirstTime implements chunk. func (c doubleDeltaEncodedChunk) FirstTime() model.Time { return c.baseTime() diff --git a/pkg/prom1/storage/local/chunk/instrumentation.go b/pkg/prom1/storage/local/chunk/instrumentation.go index c73fe991089..c28c9950cec 100644 --- a/pkg/prom1/storage/local/chunk/instrumentation.go +++ b/pkg/prom1/storage/local/chunk/instrumentation.go @@ -65,8 +65,6 @@ const ( Pin = "pin" // Unpin is the label value for unpin chunk ops (excludes the unpin on persisting). Unpin = "unpin" - // Clone is the label value for clone chunk ops. - Clone = "clone" // Transcode is the label value for transcode chunk ops. Transcode = "transcode" // Drop is the label value for drop chunk ops. diff --git a/pkg/prom1/storage/local/chunk/varbit.go b/pkg/prom1/storage/local/chunk/varbit.go index 56e2690821a..600678fab58 100644 --- a/pkg/prom1/storage/local/chunk/varbit.go +++ b/pkg/prom1/storage/local/chunk/varbit.go @@ -275,13 +275,6 @@ func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) { return c.addLaterSample(s, offset) } -// Clone implements chunk. -func (c varbitChunk) Clone() Chunk { - clone := make(varbitChunk, len(c)) - copy(clone, c) - return &clone -} - // NewIterator implements chunk. func (c varbitChunk) NewIterator() Iterator { return newVarbitChunkIterator(c) @@ -299,15 +292,6 @@ func (c varbitChunk) Marshal(w io.Writer) error { return nil } -// MarshalToBuf implements chunk. -func (c varbitChunk) MarshalToBuf(buf []byte) error { - n := copy(buf, c) - if n != len(c) { - return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) - } - return nil -} - // Unmarshal implements chunk. func (c varbitChunk) Unmarshal(r io.Reader) error { _, err := io.ReadFull(r, c) @@ -340,8 +324,7 @@ func (c varbitChunk) Len() int { return i } -// FirstTime implements chunk. -func (c varbitChunk) FirstTime() model.Time { +func (c varbitChunk) firstTime() model.Time { return model.Time( binary.BigEndian.Uint64( c[varbitFirstTimeOffset:], @@ -508,7 +491,7 @@ func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk { // first time delta from the provided sample and adds it to the chunk together // with the provided sample as the last sample. func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { - firstTimeDelta := s.Timestamp - c.FirstTime() + firstTimeDelta := s.Timestamp - c.firstTime() if firstTimeDelta < 0 { return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) } @@ -921,26 +904,6 @@ func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator { } } -// lastTimestamp implements Iterator. -func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) { - if it.len == varbitFirstSampleBitOffset { - // No samples in the chunk yet. - return model.Earliest, it.lastError - } - return it.c.lastTime(), it.lastError -} - -// contains implements Iterator. -func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) { - last, err := it.LastTimestamp() - if err != nil { - it.lastError = err - return false, err - } - return !t.Before(it.c.FirstTime()) && - !t.After(last), it.lastError -} - // scan implements Iterator. func (it *varbitChunkIterator) Scan() bool { if it.lastError != nil { @@ -965,7 +928,7 @@ func (it *varbitChunkIterator) Scan() bool { return it.lastError == nil } if it.pos == varbitFirstSampleBitOffset { - it.t = it.c.FirstTime() + it.t = it.c.firstTime() it.v = it.c.firstValue() it.pos = varbitSecondSampleBitOffset return it.lastError == nil @@ -1020,48 +983,12 @@ func (it *varbitChunkIterator) Scan() bool { return it.lastError == nil } -// findAtOrBefore implements Iterator. -func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool { - if it.len == 0 || t.Before(it.c.FirstTime()) { - return false - } - last := it.c.lastTime() - if !t.Before(last) { - it.t = last - it.v = it.c.lastValue() - it.pos = it.len + 1 - return true - } - if t == it.t { - return it.lastError == nil - } - if t.Before(it.t) || it.rewound { - it.reset() - } - - var ( - prevT = model.Earliest - prevV model.SampleValue - ) - for it.Scan() && !t.Before(it.t) { - prevT = it.t - prevV = it.v - // TODO(beorn7): If we are in a repeat, we could iterate forward - // much faster. - } - if t == it.t { - return it.lastError == nil - } - it.rewind(prevT, prevV) - return it.lastError == nil -} - // findAtOrAfter implements Iterator. func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool { if it.len == 0 || t.After(it.c.lastTime()) { return false } - first := it.c.FirstTime() + first := it.c.firstTime() if !t.After(first) { it.reset() return it.Scan() diff --git a/pkg/util/chunkcompat/compat.go b/pkg/util/chunkcompat/compat.go index 4ca310f7f40..8a2c7179b73 100644 --- a/pkg/util/chunkcompat/compat.go +++ b/pkg/util/chunkcompat/compat.go @@ -1,11 +1,14 @@ package chunkcompat import ( + "bytes" + + "github.com/prometheus/common/model" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester/client" prom_chunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/util" - "github.com/prometheus/common/model" ) // StreamsToMatrix converts a slice of QueryStreamResponse to a model.Matrix. @@ -82,13 +85,14 @@ func ToChunks(in []chunk.Chunk) ([]client.Chunk, error) { StartTimestampMs: int64(i.From), EndTimestampMs: int64(i.Through), Encoding: int32(i.Data.Encoding()), - Data: make([]byte, prom_chunk.ChunkLen, prom_chunk.ChunkLen), } - if err := i.Data.MarshalToBuf(wireChunk.Data); err != nil { + buf := bytes.NewBuffer(make([]byte, 0, prom_chunk.ChunkLen)) + if err := i.Data.Marshal(buf); err != nil { return nil, err } + wireChunk.Data = buf.Bytes() out = append(out, wireChunk) } return out, nil From 8f2623c47425e9e32a6c59ea5a579cdbb76cefcb Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 2 Oct 2018 11:50:13 +0100 Subject: [PATCH 02/11] Expose reasons as counter. Signed-off-by: Tom Wilkie --- pkg/ingester/flush.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index acad6b6a02e..f51391bad7a 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -46,6 +46,10 @@ var ( Name: "cortex_ingester_memory_chunks", Help: "The total number of chunks in memory.", }) + flushReasons = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_ingester_flush_reasons", + Help: "Total number of series scheduled for flushing, with reasons.", + }, []string{"reason"}) ) // Flush triggers a flush of all the chunks and closes the flush queues. @@ -109,6 +113,23 @@ const ( reasonIdle ) +func (f flushReason) String() string { + switch f { + case noFlush: + return "NoFlush" + case reasonImmediate: + return "Immediate" + case reasonMultipleChunksInSeries: + return "MultipleChunksInSeries" + case reasonAged: + return "Aged" + case reasonIdle: + return "Idle" + default: + panic("unrecognised flushReason") + } +} + // sweepSeries schedules a series for flushing based on a set of criteria // // NB we don't close the head chunk here, as the series could wait in the queue @@ -120,6 +141,7 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo firstTime := series.firstTime() flush := i.shouldFlushSeries(series, fp, immediate) + flushReasons.WithLabelValues(flush.String()).Inc() if flush != noFlush { flushQueueIndex := int(uint64(fp) % uint64(i.cfg.ConcurrentFlushes)) From 0d5069fe18a652892b2771a47d9a69c226f26df7 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 2 Oct 2018 11:05:34 +0100 Subject: [PATCH 03/11] Add bigchunk implementation. Signed-off-by: Tom Wilkie --- Gopkg.lock | 1 + pkg/chunk/chunk.go | 5 + pkg/prom1/storage/local/chunk/bigchunk.go | 230 ++++++++++++++++++++ pkg/prom1/storage/local/chunk/chunk.go | 6 + pkg/prom1/storage/local/chunk/chunk_test.go | 127 +++++++++++ pkg/querier/batch/chunk_test.go | 2 +- pkg/querier/querier_test.go | 1 + 7 files changed, 371 insertions(+), 1 deletion(-) create mode 100644 pkg/prom1/storage/local/chunk/bigchunk.go diff --git a/Gopkg.lock b/Gopkg.lock index f35732327e8..2a0c8fa99df 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1411,6 +1411,7 @@ "github.com/prometheus/prometheus/util/strutil", "github.com/prometheus/prometheus/web/api/v1", "github.com/prometheus/tsdb", + "github.com/prometheus/tsdb/chunkenc", "github.com/prometheus/tsdb/fileutil", "github.com/segmentio/fasthash/fnv1a", "github.com/stretchr/testify/assert", diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index eb926358612..1aa80308dd6 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -28,6 +28,7 @@ const ( ErrInvalidChecksum = errs.Error("invalid chunk checksum") ErrWrongMetadata = errs.Error("wrong chunk metadata") ErrMetadataLength = errs.Error("chunk metadata wrong length") + ErrDataLength = errs.Error("chunk data wrong length") ) var castagnoliTable = crc32.MakeTable(crc32.Castagnoli) @@ -318,6 +319,10 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { c.encoded = input remainingData := input[len(input)-r.Len():] + if int(dataLen) > len(remainingData) { + return ErrDataLength + } + return c.Data.UnmarshalFromBuf(remainingData[:int(dataLen)]) } diff --git a/pkg/prom1/storage/local/chunk/bigchunk.go b/pkg/prom1/storage/local/chunk/bigchunk.go new file mode 100644 index 00000000000..43522d10732 --- /dev/null +++ b/pkg/prom1/storage/local/chunk/bigchunk.go @@ -0,0 +1,230 @@ +package chunk + +import ( + "bytes" + "encoding/binary" + "errors" + "io" + "io/ioutil" + + "github.com/prometheus/common/model" + "github.com/prometheus/tsdb/chunkenc" +) + +const samplesPerChunk = 120 + +var errOutOfBounds = errors.New("out of bounds") + +// bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no +// upperbound on number of samples it can contain. +type bigchunk struct { + chunks []chunkenc.Chunk + appender chunkenc.Appender + remainingSamples int +} + +func newBigchunk() *bigchunk { + return &bigchunk{} +} + +func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { + if b.remainingSamples == 0 { + chunk := chunkenc.NewXORChunk() + appender, err := chunk.Appender() + if err != nil { + return nil, err + } + + b.chunks = append(b.chunks, chunk) + b.appender = appender + b.remainingSamples = samplesPerChunk + } + + b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) + b.remainingSamples-- + return []Chunk{b}, nil +} + +func (b *bigchunk) Marshal(wio io.Writer) error { + w := writer{wio} + if err := w.WriteVarInt16(uint16(len(b.chunks))); err != nil { + return err + } + for _, chunk := range b.chunks { + buf := chunk.Bytes() + if err := w.WriteVarInt16(uint16(len(buf))); err != nil { + return err + } + if _, err := w.Write(buf); err != nil { + return err + } + } + return nil +} + +func (b *bigchunk) MarshalToBuf(buf []byte) error { + writer := bytes.NewBuffer(buf) + return b.Marshal(writer) +} + +func (b *bigchunk) Unmarshal(r io.Reader) error { + buf, err := ioutil.ReadAll(r) + if err != nil { + return err + } + return b.UnmarshalFromBuf(buf) +} + +func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { + r := reader{buf: buf} + numChunks, err := r.ReadUint16() + if err != nil { + return err + } + + b.chunks = make([]chunkenc.Chunk, 0, numChunks) + for i := uint16(0); i < numChunks; i++ { + chunkLen, err := r.ReadUint16() + if err != nil { + return err + } + + chunkBuf, err := r.ReadBytes(int(chunkLen)) + if err != nil { + return err + } + + chunk, err := chunkenc.FromData(chunkenc.EncXOR, chunkBuf) + if err != nil { + return err + } + + b.chunks = append(b.chunks, chunk) + } + return nil +} + +func (b *bigchunk) Encoding() Encoding { + return Bigchunk +} + +func (b *bigchunk) Utilization() float64 { + return 1.0 +} + +func (b *bigchunk) Len() int { + sum := 0 + for _, c := range b.chunks { + sum += c.NumSamples() + } + return sum +} + +func (b *bigchunk) NewIterator() Iterator { + return &bigchunkIterator{ + bigchunk: b, + } +} + +type writer struct { + io.Writer +} + +func (w writer) WriteVarInt16(i uint16) error { + var b [2]byte + binary.LittleEndian.PutUint16(b[:], i) + _, err := w.Write(b[:]) + return err +} + +type reader struct { + i int + buf []byte +} + +func (r *reader) ReadUint16() (uint16, error) { + if r.i+2 > len(r.buf) { + return 0, errOutOfBounds + } + result := binary.LittleEndian.Uint16(r.buf[r.i:]) + r.i += 2 + return result, nil +} + +func (r *reader) ReadBytes(count int) ([]byte, error) { + if r.i+count > len(r.buf) { + return nil, errOutOfBounds + } + result := r.buf[r.i : r.i+count] + r.i += count + return result, nil +} + +type bigchunkIterator struct { + *bigchunk + + iter chunkenc.Iterator + i int +} + +func (i *bigchunkIterator) FindAtOrAfter(target model.Time) bool { + i.i = 0 + for i.i < len(i.chunks) { + i.iter = i.chunks[i.i].Iterator() + i.i++ + + for i.iter.Next() { + t, _ := i.iter.At() + if t >= int64(target) { + return true + } + } + } + return false +} + +func (i *bigchunkIterator) Scan() bool { + if i.iter != nil && i.iter.Next() { + return true + } + + for i.i < len(i.chunks) { + i.iter = i.chunks[i.i].Iterator() + i.i++ + if i.iter.Next() { + return true + } + } + return false +} + +func (i *bigchunkIterator) Value() model.SamplePair { + t, v := i.iter.At() + return model.SamplePair{ + Timestamp: model.Time(t), + Value: model.SampleValue(v), + } +} + +func (i *bigchunkIterator) Batch(size int) Batch { + var result Batch + j := 0 + for j < size { + t, v := i.iter.At() + result.Timestamps[j] = t + result.Values[j] = v + j++ + if j < size && !i.Scan() { + break + } + } + result.Length = j + return result +} + +func (i *bigchunkIterator) Err() error { + if i.iter != nil { + return i.iter.Err() + } + return nil +} diff --git a/pkg/prom1/storage/local/chunk/chunk.go b/pkg/prom1/storage/local/chunk/chunk.go index da80e3aa119..31e2030e2bb 100644 --- a/pkg/prom1/storage/local/chunk/chunk.go +++ b/pkg/prom1/storage/local/chunk/chunk.go @@ -55,6 +55,8 @@ func (e *Encoding) Set(s string) error { *e = DoubleDelta case "2": *e = Varbit + case "3": + *e = Bigchunk default: return fmt.Errorf("invalid chunk encoding: %s", s) } @@ -68,6 +70,8 @@ const ( DoubleDelta // Varbit encoding Varbit + // Bigchunk encoding + Bigchunk ) // Chunk is the interface for all chunks. Chunks are generally not @@ -208,6 +212,8 @@ func NewForEncoding(encoding Encoding) (Chunk, error) { return newDoubleDeltaEncodedChunk(d1, d0, true, ChunkLen), nil case Varbit: return newVarbitChunk(varbitZeroEncoding), nil + case Bigchunk: + return newBigchunk(), nil default: return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) } diff --git a/pkg/prom1/storage/local/chunk/chunk_test.go b/pkg/prom1/storage/local/chunk/chunk_test.go index 20ac73c7e67..10d67da6bf8 100644 --- a/pkg/prom1/storage/local/chunk/chunk_test.go +++ b/pkg/prom1/storage/local/chunk/chunk_test.go @@ -18,9 +18,13 @@ package chunk import ( + "bytes" + "fmt" "testing" + "time" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" ) func TestLen(t *testing.T) { @@ -47,3 +51,126 @@ func TestLen(t *testing.T) { } } } + +var step = int(15 * time.Second / time.Millisecond) + +func TestChunk(t *testing.T) { + for _, tc := range []struct { + encoding Encoding + maxSamples int + }{ + {DoubleDelta, 989}, + {Varbit, 2048}, + {Bigchunk, 4096}, + } { + for samples := 0; samples < tc.maxSamples; samples += tc.maxSamples / 10 { + + // DoubleDelta doesn't support zero length chunks. + if tc.encoding == DoubleDelta && samples == 0 { + continue + } + + t.Run(fmt.Sprintf("testChunkEncoding/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkEncoding(t, tc.encoding, samples) + }) + + t.Run(fmt.Sprintf("testChunkSeek/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkSeek(t, tc.encoding, samples) + }) + + t.Run(fmt.Sprintf("testChunkBatch/%s/%d", tc.encoding.String(), samples), func(t *testing.T) { + testChunkBatch(t, tc.encoding, samples) + }) + } + } +} + +func mkChunk(t *testing.T, encoding Encoding, samples int) Chunk { + chunk, err := NewForEncoding(encoding) + require.NoError(t, err) + + for i := 0; i < samples; i++ { + chunks, err := chunk.Add(model.SamplePair{ + Timestamp: model.Time(i * step), + Value: model.SampleValue(i), + }) + require.NoError(t, err) + require.Len(t, chunks, 1) + chunk = chunks[0] + } + + return chunk +} + +// testChunkEncoding checks chunks roundtrip and contain all their samples. +func testChunkEncoding(t *testing.T, encoding Encoding, samples int) { + chunk := mkChunk(t, encoding, samples) + + var buf bytes.Buffer + err := chunk.Marshal(&buf) + require.NoError(t, err) + + bs1 := buf.Bytes() + chunk, err = NewForEncoding(encoding) + err = chunk.Unmarshal(&buf) + require.NoError(t, err) + + // Check all the samples are in there. + iter := chunk.NewIterator() + for i := 0; i < samples; i++ { + require.True(t, iter.Scan()) + sample := iter.Value() + require.EqualValues(t, model.Time(i*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(i), sample.Value) + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) + + // Check the byte representation after another Marshall is the same. + err = chunk.Marshal(&buf) + require.NoError(t, err) + bs2 := buf.Bytes() + + require.True(t, bytes.Equal(bs1, bs2)) +} + +// testChunkSeek checks seek works as expected. +func testChunkSeek(t *testing.T, encoding Encoding, samples int) { + chunk := mkChunk(t, encoding, samples) + + iter := chunk.NewIterator() + for i := 0; i < samples; i += samples / 10 { + require.True(t, iter.FindAtOrAfter(model.Time(i*step))) + sample := iter.Value() + require.EqualValues(t, model.Time(i*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(i), sample.Value) + + i++ + for ; i < samples; i++ { + require.True(t, iter.Scan()) + sample := iter.Value() + require.EqualValues(t, model.Time(i*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(i), sample.Value) + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) + } +} + +func testChunkBatch(t *testing.T, encoding Encoding, samples int) { + chunk := mkChunk(t, encoding, samples) + + // Check all the samples are in there. + iter := chunk.NewIterator() + for i := 0; i < samples; { + require.True(t, iter.Scan()) + batch := iter.Batch(BatchSize) + for j := 0; j < batch.Length; j++ { + require.EqualValues(t, int64((i+j)*step), batch.Timestamps[j]) + require.EqualValues(t, float64(i+j), batch.Values[j]) + } + i += batch.Length + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) +} diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 45beb51ef20..cf46e895c81 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -31,7 +31,7 @@ func mkChunk(t require.TestingT, from model.Time, points int) chunk.Chunk { metric := model.Metric{ model.MetricNameLabel: "foo", } - pc, err := promchunk.NewForEncoding(promchunk.DoubleDelta) + pc, err := promchunk.NewForEncoding(promchunk.Bigchunk) require.NoError(t, err) ts := from for i := 0; i < points; i++ { diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 72f629ff1ec..f55c10db87e 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -57,6 +57,7 @@ var ( }{ {"DoubleDelta", promchunk.DoubleDelta}, {"Varbit", promchunk.Varbit}, + {"Bigchunk", promchunk.Bigchunk}, } queries = []query{ From 7b3215f3b4de6cc8087fd3f94cfb946b50770ad8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 16 Oct 2018 11:56:22 +0100 Subject: [PATCH 04/11] Add index to biggerchunks to speed up Seeks; reduce size of subchunks to reduce amount of iteration we have to do through the chunk to find the right place. Signed-off-by: Tom Wilkie --- pkg/prom1/storage/local/chunk/bigchunk.go | 61 ++++++++++++++++++--- pkg/prom1/storage/local/chunk/chunk_test.go | 8 +-- 2 files changed, 57 insertions(+), 12 deletions(-) diff --git a/pkg/prom1/storage/local/chunk/bigchunk.go b/pkg/prom1/storage/local/chunk/bigchunk.go index 43522d10732..3d20a03f7cf 100644 --- a/pkg/prom1/storage/local/chunk/bigchunk.go +++ b/pkg/prom1/storage/local/chunk/bigchunk.go @@ -11,14 +11,17 @@ import ( "github.com/prometheus/tsdb/chunkenc" ) -const samplesPerChunk = 120 +const samplesPerChunk = 60 var errOutOfBounds = errors.New("out of bounds") // bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no // upperbound on number of samples it can contain. type bigchunk struct { - chunks []chunkenc.Chunk + chunks []chunkenc.Chunk + starts []int64 + ends []int64 + appender chunkenc.Appender remainingSamples int } @@ -35,13 +38,17 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { return nil, err } + b.starts = append(b.starts, int64(sample.Timestamp)) + b.ends = append(b.ends, int64(sample.Timestamp)) b.chunks = append(b.chunks, chunk) + b.appender = appender b.remainingSamples = samplesPerChunk } b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) b.remainingSamples-- + b.ends[len(b.ends)-1] = int64(sample.Timestamp) return []Chunk{b}, nil } @@ -99,7 +106,14 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { return err } + start, end, err := firstAndLastTimes(chunk) + if err != nil { + return err + } + b.chunks = append(b.chunks, chunk) + b.starts = append(b.starts, start) + b.ends = append(b.ends, end) } return nil } @@ -168,18 +182,30 @@ type bigchunkIterator struct { } func (i *bigchunkIterator) FindAtOrAfter(target model.Time) bool { + // On average we'll have about 12*3600/15/120 = 24 chunks, so just linear + // scan for now. i.i = 0 for i.i < len(i.chunks) { - i.iter = i.chunks[i.i].Iterator() + if int64(target) <= i.ends[i.i] { + break + } i.i++ + } - for i.iter.Next() { - t, _ := i.iter.At() - if t >= int64(target) { - return true - } + if i.i >= len(i.chunks) { + return false + } + + i.iter = i.chunks[i.i].Iterator() + i.i++ + + for i.iter.Next() { + t, _ := i.iter.At() + if t >= int64(target) { + return true } } + return false } @@ -214,6 +240,7 @@ func (i *bigchunkIterator) Batch(size int) Batch { result.Timestamps[j] = t result.Values[j] = v j++ + if j < size && !i.Scan() { break } @@ -228,3 +255,21 @@ func (i *bigchunkIterator) Err() error { } return nil } + +func firstAndLastTimes(c chunkenc.Chunk) (int64, int64, error) { + var ( + first int64 + last int64 + firstSet bool + iter = c.Iterator() + ) + for iter.Next() { + t, _ := iter.At() + if !firstSet { + first = t + firstSet = true + } + last = t + } + return first, last, iter.Err() +} diff --git a/pkg/prom1/storage/local/chunk/chunk_test.go b/pkg/prom1/storage/local/chunk/chunk_test.go index 10d67da6bf8..d96bb99d972 100644 --- a/pkg/prom1/storage/local/chunk/chunk_test.go +++ b/pkg/prom1/storage/local/chunk/chunk_test.go @@ -145,12 +145,12 @@ func testChunkSeek(t *testing.T, encoding Encoding, samples int) { require.EqualValues(t, model.Time(i*step), sample.Timestamp) require.EqualValues(t, model.SampleValue(i), sample.Value) - i++ - for ; i < samples; i++ { + j := i + 1 + for ; j < samples; j++ { require.True(t, iter.Scan()) sample := iter.Value() - require.EqualValues(t, model.Time(i*step), sample.Timestamp) - require.EqualValues(t, model.SampleValue(i), sample.Value) + require.EqualValues(t, model.Time(j*step), sample.Timestamp) + require.EqualValues(t, model.SampleValue(j), sample.Value) } require.False(t, iter.Scan()) require.NoError(t, iter.Err()) From e8b764ec9902e950159a052557ff15b2eedbbbfc Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 16 Oct 2018 15:00:23 +0100 Subject: [PATCH 05/11] When ingester streaming, slice biggerchunks into smaller chunks. Signed-off-by: Tom Wilkie --- pkg/ingester/ingester.go | 6 +- pkg/ingester/series.go | 8 +++ pkg/prom1/storage/local/chunk/bigchunk.go | 18 ++++++ .../storage/local/chunk/bigchunk_test.go | 57 +++++++++++++++++++ pkg/prom1/storage/local/chunk/chunk.go | 4 ++ pkg/prom1/storage/local/chunk/delta.go | 4 ++ pkg/prom1/storage/local/chunk/doubledelta.go | 4 ++ pkg/prom1/storage/local/chunk/varbit.go | 4 ++ 8 files changed, 102 insertions(+), 3 deletions(-) create mode 100644 pkg/prom1/storage/local/chunk/bigchunk_test.go diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 00548133180..c04a0d8f8bc 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -366,7 +366,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client // QueryStream implements service.IngesterServer func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { - _, _, matchers, err := client.FromQueryRequest(req) + from, through, matchers, err := client.FromQueryRequest(req) if err != nil { return err } @@ -392,8 +392,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ numSeries++ chunks := make([]*desc, 0, len(series.chunkDescs)) for _, chunk := range series.chunkDescs { - if !(chunk.FirstTime.After(model.Time(req.EndTimestampMs)) || chunk.LastTime.Before(model.Time(req.StartTimestampMs))) { - chunks = append(chunks, chunk) + if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) { + chunks = append(chunks, chunk.slice(from, through)) } } diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index ba399d78bab..3136cd0575c 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -245,3 +245,11 @@ func (d *desc) add(s model.SamplePair) ([]chunk.Chunk, error) { return cs, nil } + +func (d *desc) slice(start, end model.Time) *desc { + return &desc{ + C: d.C.Slice(start, end), + FirstTime: start, + LastTime: end, + } +} diff --git a/pkg/prom1/storage/local/chunk/bigchunk.go b/pkg/prom1/storage/local/chunk/bigchunk.go index 3d20a03f7cf..7b4265f5d54 100644 --- a/pkg/prom1/storage/local/chunk/bigchunk.go +++ b/pkg/prom1/storage/local/chunk/bigchunk.go @@ -140,6 +140,24 @@ func (b *bigchunk) NewIterator() Iterator { } } +func (b *bigchunk) Slice(start, end model.Time) Chunk { + i, j := 0, len(b.chunks) + for k := 0; k < len(b.chunks); k++ { + if b.ends[k] < int64(start) { + i = k + 1 + } + if b.starts[k] > int64(end) { + j = k + break + } + } + return &bigchunk{ + chunks: b.chunks[i:j], + starts: b.starts[i:j], + ends: b.ends[i:j], + } +} + type writer struct { io.Writer } diff --git a/pkg/prom1/storage/local/chunk/bigchunk_test.go b/pkg/prom1/storage/local/chunk/bigchunk_test.go new file mode 100644 index 00000000000..f1d6894317b --- /dev/null +++ b/pkg/prom1/storage/local/chunk/bigchunk_test.go @@ -0,0 +1,57 @@ +package chunk + +import ( + "testing" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" +) + +func TestSliceBiggerChunk(t *testing.T) { + var c Chunk = newBigchunk() + for i := 0; i < 12*3600/15; i++ { + cs, err := c.Add(model.SamplePair{ + Timestamp: model.Time(i * step), + Value: model.SampleValue(i), + }) + require.NoError(t, err) + c = cs[0] + } + + // Test for when the slice aligns perfectly with the sub-chunk boundaries. + + for i := 0; i < (12*3600/15)-480; i += 120 { + s := c.Slice(model.Time(i*step), model.Time((i+479)*step)) + iter := s.NewIterator() + for j := i; j < i+480; j++ { + require.True(t, iter.Scan()) + sample := iter.Value() + require.Equal(t, sample.Timestamp, model.Time(j*step)) + require.Equal(t, sample.Value, model.SampleValue(j)) + } + require.False(t, iter.Scan()) + require.NoError(t, iter.Err()) + } + + // Test for when the slice does not align perfectly with the sub-chunk boundaries. + for i := 0; i < (12*3600/15)-500; i += 100 { + s := c.Slice(model.Time(i*step), model.Time((i+500)*step)) + iter := s.NewIterator() + + // Consume some samples until we get to where we want to be. + for { + require.True(t, iter.Scan()) + sample := iter.Value() + if sample.Timestamp == model.Time(i*step) { + break + } + } + + for j := i; j < i+500; j++ { + sample := iter.Value() + require.Equal(t, sample.Timestamp, model.Time(j*step)) + require.Equal(t, sample.Value, model.SampleValue(j)) + require.True(t, iter.Scan()) + } + } +} diff --git a/pkg/prom1/storage/local/chunk/chunk.go b/pkg/prom1/storage/local/chunk/chunk.go index 31e2030e2bb..058173687a2 100644 --- a/pkg/prom1/storage/local/chunk/chunk.go +++ b/pkg/prom1/storage/local/chunk/chunk.go @@ -91,6 +91,10 @@ type Chunk interface { Encoding() Encoding Utilization() float64 + // Slice returns a smaller chunk the includes all samples between start and end + // (inclusive). Its may over estimate. On some encodings it is a noop. + Slice(start, end model.Time) Chunk + // Len returns the number of samples in the chunk. Implementations may be // expensive. Len() int diff --git a/pkg/prom1/storage/local/chunk/delta.go b/pkg/prom1/storage/local/chunk/delta.go index 6252668b73d..3e13e5397ef 100644 --- a/pkg/prom1/storage/local/chunk/delta.go +++ b/pkg/prom1/storage/local/chunk/delta.go @@ -180,6 +180,10 @@ func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { return []Chunk{&c}, nil } +func (c *deltaEncodedChunk) Slice(_, _ model.Time) Chunk { + return c +} + // NewIterator implements chunk. func (c *deltaEncodedChunk) NewIterator() Iterator { return newIndexAccessingChunkIterator(c.Len(), &deltaEncodedIndexAccessor{ diff --git a/pkg/prom1/storage/local/chunk/doubledelta.go b/pkg/prom1/storage/local/chunk/doubledelta.go index d171e28bf9e..cc77e9f8e45 100644 --- a/pkg/prom1/storage/local/chunk/doubledelta.go +++ b/pkg/prom1/storage/local/chunk/doubledelta.go @@ -206,6 +206,10 @@ func (c *doubleDeltaEncodedChunk) NewIterator() Iterator { }) } +func (c *doubleDeltaEncodedChunk) Slice(_, _ model.Time) Chunk { + return c +} + // Marshal implements chunk. func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error { if len(c) > math.MaxUint16 { diff --git a/pkg/prom1/storage/local/chunk/varbit.go b/pkg/prom1/storage/local/chunk/varbit.go index 600678fab58..6cfe30c2d43 100644 --- a/pkg/prom1/storage/local/chunk/varbit.go +++ b/pkg/prom1/storage/local/chunk/varbit.go @@ -280,6 +280,10 @@ func (c varbitChunk) NewIterator() Iterator { return newVarbitChunkIterator(c) } +func (c *varbitChunk) Slice(_, _ model.Time) Chunk { + return c +} + // Marshal implements chunk. func (c varbitChunk) Marshal(w io.Writer) error { n, err := w.Write(c) From 0582c4f08730389af08c714cf12bf607ed7f0c0e Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 30 Oct 2018 13:57:59 -0400 Subject: [PATCH 06/11] Record histogram of chunk size. Signed-off-by: Tom Wilkie --- pkg/ingester/flush.go | 10 ++++++++-- pkg/prom1/storage/local/chunk/bigchunk.go | 8 ++++++++ pkg/prom1/storage/local/chunk/chunk.go | 3 +++ pkg/prom1/storage/local/chunk/delta.go | 4 ++++ pkg/prom1/storage/local/chunk/doubledelta.go | 4 ++++ pkg/prom1/storage/local/chunk/varbit.go | 4 ++++ 6 files changed, 31 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index f51391bad7a..2971d1fda4f 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -35,6 +35,11 @@ var ( Help: "Distribution of stored chunk lengths (when stored).", Buckets: prometheus.ExponentialBuckets(5, 2, 11), // biggest bucket is 5*2^(11-1) = 5120 }) + chunkSize = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_ingester_chunk_size_bytes", + Help: "Distribution of stored chunk sizes (when stored).", + Buckets: prometheus.ExponentialBuckets(10, 10, 5), // biggest bucket is 5*2^(11-1) = 5120 + }) chunkAge = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_chunk_age_seconds", Help: "Distribution of chunk ages (when stored).", @@ -312,10 +317,11 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, metric // Record statistsics only when actual put request did not return error. for _, chunkDesc := range chunkDescs { - utilization, length := chunkDesc.C.Utilization(), chunkDesc.C.Len() - util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "utilization", utilization, "length", length, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime) + utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size() + util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime) chunkUtilization.Observe(utilization) chunkLength.Observe(float64(length)) + chunkSize.Observe(float64(size)) chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) } diff --git a/pkg/prom1/storage/local/chunk/bigchunk.go b/pkg/prom1/storage/local/chunk/bigchunk.go index 7b4265f5d54..54a26e232be 100644 --- a/pkg/prom1/storage/local/chunk/bigchunk.go +++ b/pkg/prom1/storage/local/chunk/bigchunk.go @@ -134,6 +134,14 @@ func (b *bigchunk) Len() int { return sum } +func (b *bigchunk) Size() int { + sum := 0 + for _, c := range b.chunks { + sum += len(c.Bytes()) + } + return sum +} + func (b *bigchunk) NewIterator() Iterator { return &bigchunkIterator{ bigchunk: b, diff --git a/pkg/prom1/storage/local/chunk/chunk.go b/pkg/prom1/storage/local/chunk/chunk.go index 058173687a2..679450e5a92 100644 --- a/pkg/prom1/storage/local/chunk/chunk.go +++ b/pkg/prom1/storage/local/chunk/chunk.go @@ -98,6 +98,9 @@ type Chunk interface { // Len returns the number of samples in the chunk. Implementations may be // expensive. Len() int + + // Size returns the approximate length of the chunk in bytes. + Size() int } // Iterator enables efficient access to the content of a chunk. It is diff --git a/pkg/prom1/storage/local/chunk/delta.go b/pkg/prom1/storage/local/chunk/delta.go index 3e13e5397ef..c28528d4a18 100644 --- a/pkg/prom1/storage/local/chunk/delta.go +++ b/pkg/prom1/storage/local/chunk/delta.go @@ -294,6 +294,10 @@ func (c deltaEncodedChunk) Len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } +func (c deltaEncodedChunk) Size() int { + return len(c) +} + // deltaEncodedIndexAccessor implements indexAccessor. type deltaEncodedIndexAccessor struct { c deltaEncodedChunk diff --git a/pkg/prom1/storage/local/chunk/doubledelta.go b/pkg/prom1/storage/local/chunk/doubledelta.go index cc77e9f8e45..e9ab14e7969 100644 --- a/pkg/prom1/storage/local/chunk/doubledelta.go +++ b/pkg/prom1/storage/local/chunk/doubledelta.go @@ -355,6 +355,10 @@ func (c doubleDeltaEncodedChunk) Len() int { return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2 } +func (c doubleDeltaEncodedChunk) Size() int { + return len(c) +} + func (c doubleDeltaEncodedChunk) isInt() bool { return c[doubleDeltaHeaderIsIntOffset] == 1 } diff --git a/pkg/prom1/storage/local/chunk/varbit.go b/pkg/prom1/storage/local/chunk/varbit.go index 6cfe30c2d43..579eebb4347 100644 --- a/pkg/prom1/storage/local/chunk/varbit.go +++ b/pkg/prom1/storage/local/chunk/varbit.go @@ -328,6 +328,10 @@ func (c varbitChunk) Len() int { return i } +func (c varbitChunk) Size() int { + return len(c) +} + func (c varbitChunk) firstTime() model.Time { return model.Time( binary.BigEndian.Uint64( From 0e2d316ac5960516d6c249f8d6cb34d1a4e74121 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Thu, 8 Nov 2018 15:15:19 +0000 Subject: [PATCH 07/11] Compact subchunks to remove cap-len over-allocated bytes. Saves ~30% memory. Signed-off-by: Tom Wilkie --- pkg/prom1/storage/local/chunk/bigchunk.go | 40 ++++++++++++++----- .../storage/local/chunk/bigchunk_test.go | 32 +++++++++++++++ 2 files changed, 62 insertions(+), 10 deletions(-) diff --git a/pkg/prom1/storage/local/chunk/bigchunk.go b/pkg/prom1/storage/local/chunk/bigchunk.go index 54a26e232be..ad272b7d2e0 100644 --- a/pkg/prom1/storage/local/chunk/bigchunk.go +++ b/pkg/prom1/storage/local/chunk/bigchunk.go @@ -32,18 +32,9 @@ func newBigchunk() *bigchunk { func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { if b.remainingSamples == 0 { - chunk := chunkenc.NewXORChunk() - appender, err := chunk.Appender() - if err != nil { + if err := b.addNextChunk(sample.Timestamp); err != nil { return nil, err } - - b.starts = append(b.starts, int64(sample.Timestamp)) - b.ends = append(b.ends, int64(sample.Timestamp)) - b.chunks = append(b.chunks, chunk) - - b.appender = appender - b.remainingSamples = samplesPerChunk } b.appender.Append(int64(sample.Timestamp), float64(sample.Value)) @@ -52,6 +43,35 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { return []Chunk{b}, nil } +// addNextChunk adds a new XOR "subchunk" to the internal list of chunks. +func (b *bigchunk) addNextChunk(start model.Time) error { + // To save memory, we "compact" the last chunk. + if l := len(b.chunks); l > 0 { + c := b.chunks[l-1] + buf := make([]byte, len(c.Bytes())) + copy(buf, c.Bytes()) + compacted, err := chunkenc.FromData(chunkenc.EncXOR, buf) + if err != nil { + return err + } + b.chunks[l-1] = compacted + } + + chunk := chunkenc.NewXORChunk() + appender, err := chunk.Appender() + if err != nil { + return err + } + + b.starts = append(b.starts, int64(start)) + b.ends = append(b.ends, int64(start)) + b.chunks = append(b.chunks, chunk) + + b.appender = appender + b.remainingSamples = samplesPerChunk + return nil +} + func (b *bigchunk) Marshal(wio io.Writer) error { w := writer{wio} if err := w.WriteVarInt16(uint16(len(b.chunks))); err != nil { diff --git a/pkg/prom1/storage/local/chunk/bigchunk_test.go b/pkg/prom1/storage/local/chunk/bigchunk_test.go index f1d6894317b..23c40468962 100644 --- a/pkg/prom1/storage/local/chunk/bigchunk_test.go +++ b/pkg/prom1/storage/local/chunk/bigchunk_test.go @@ -1,6 +1,8 @@ package chunk import ( + "bytes" + "fmt" "testing" "github.com/prometheus/common/model" @@ -55,3 +57,33 @@ func TestSliceBiggerChunk(t *testing.T) { } } } + +func BenchmarkBiggerChunkMemory(b *testing.B) { + for i := 0; i < b.N; i++ { + var c Chunk = newBigchunk() + for i := 0; i < 12*3600/15; i++ { + cs, err := c.Add(model.SamplePair{ + Timestamp: model.Time(i * step), + Value: model.SampleValue(i), + }) + require.NoError(b, err) + c = cs[0] + } + + c.(*bigchunk).printSize() + } +} + +// printSize calculates various sizes of the chunk when encoded, and in memory. +func (b *bigchunk) printSize() { + var buf bytes.Buffer + b.Marshal(&buf) + + var size, allocd int + for _, c := range b.chunks { + size += len(c.Bytes()) + allocd += cap(c.Bytes()) + } + + fmt.Println("encodedlen =", len(buf.Bytes()), "subchunks =", len(b.chunks), "len =", size, "cap =", allocd) +} From e306d3a6ff69e3ec79dcabd60bc404b5cc907c60 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 9 Nov 2018 11:57:54 +0000 Subject: [PATCH 08/11] Move pkg/prom1/storage/local/chunk to pkg/chunk/encoding Signed-off-by: Tom Wilkie --- pkg/chunk/cache/cache_test.go | 4 +- pkg/chunk/chunk.go | 2 +- pkg/chunk/chunk_store_test.go | 38 +++++++++---------- pkg/chunk/chunk_test.go | 20 +++++----- .../chunk => chunk/encoding}/bigchunk.go | 2 +- .../chunk => chunk/encoding}/bigchunk_test.go | 2 +- .../local/chunk => chunk/encoding}/chunk.go | 2 +- .../chunk => chunk/encoding}/chunk_test.go | 2 +- .../local/chunk => chunk/encoding}/delta.go | 2 +- .../chunk => chunk/encoding}/delta_helpers.go | 2 +- .../chunk => chunk/encoding}/delta_test.go | 2 +- .../chunk => chunk/encoding}/doubledelta.go | 2 +- .../encoding}/instrumentation.go | 2 +- .../local/chunk => chunk/encoding}/varbit.go | 2 +- .../encoding}/varbit_helpers.go | 2 +- .../chunk => chunk/encoding}/varbit_test.go | 2 +- pkg/chunk/testutils/testutils.go | 6 +-- pkg/distributor/distributor_test.go | 4 +- pkg/ingester/ingester.go | 4 +- pkg/ingester/series.go | 22 +++++------ pkg/ingester/transfer.go | 6 +-- pkg/querier/batch/batch.go | 2 +- pkg/querier/batch/chunk.go | 2 +- pkg/querier/batch/chunk_test.go | 2 +- pkg/querier/batch/merge.go | 2 +- pkg/querier/batch/non_overlapping.go | 2 +- pkg/querier/batch/stream.go | 2 +- pkg/querier/batch/stream_test.go | 2 +- pkg/querier/chunk_store_queryable_test.go | 2 +- pkg/querier/iterators/chunk_iterator.go | 2 +- .../iterators/chunk_merge_iterator_test.go | 2 +- pkg/querier/querier_test.go | 2 +- pkg/util/chunkcompat/compat.go | 2 +- 33 files changed, 77 insertions(+), 77 deletions(-) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/bigchunk.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/bigchunk_test.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/chunk.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/chunk_test.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/delta.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/delta_helpers.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/delta_test.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/doubledelta.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/instrumentation.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/varbit.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/varbit_helpers.go (99%) rename pkg/{prom1/storage/local/chunk => chunk/encoding}/varbit_test.go (98%) diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index 706d340a680..d4753dbe902 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -12,7 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" - prom_chunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" ) @@ -37,7 +37,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, promChunk[0], ts, diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index 1aa80308dd6..e2e604a38ed 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -10,7 +10,7 @@ import ( "strings" "sync" - prom_chunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" "github.com/golang/snappy" jsoniter "github.com/json-iterator/go" diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 310ea8fbe16..a2de276d6fc 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -16,7 +16,7 @@ import ( "golang.org/x/net/context" "github.com/cortexproject/cortex/pkg/chunk/cache" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/extract" "github.com/cortexproject/cortex/pkg/util/validation" @@ -135,25 +135,25 @@ func TestChunkStore_Get(t *testing.T) { fooMetric1 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", } fooMetric2 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", } // barMetric1 is a subset of barMetric2 to test over-matching bug. barMetric1 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", + "bar": "baz", } barMetric2 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } fooChunk1 := dummyChunkFor(now, fooMetric1) @@ -315,14 +315,14 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { now := model.Now() chunk1 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", }) chunk2 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", }) for _, tc := range []struct { @@ -415,7 +415,7 @@ func TestChunkStoreRandom(t *testing.T) { const chunkLen = 2 * 3600 // in seconds for i := 0; i < 100; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := chunk.New().Add(model.SamplePair{ + chunks, _ := encoding.New().Add(model.SamplePair{ Timestamp: ts, Value: model.SampleValue(float64(i)), }) @@ -424,7 +424,7 @@ func TestChunkStoreRandom(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -479,7 +479,7 @@ func TestChunkStoreLeastRead(t *testing.T) { const chunkLen = 60 // in seconds for i := 0; i < 24; i++ { ts := model.TimeFromUnix(int64(i * chunkLen)) - chunks, _ := chunk.New().Add(model.SamplePair{ + chunks, _ := encoding.New().Add(model.SamplePair{ Timestamp: ts, Value: model.SampleValue(float64(i)), }) @@ -488,7 +488,7 @@ func TestChunkStoreLeastRead(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -534,7 +534,7 @@ func TestIndexCachingWorks(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) metric := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", } storeMaker := stores[1] storeCfg := storeMaker.configFn() diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index 7882eb0fe17..4db4aec1b19 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/util" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -19,13 +19,13 @@ const userID = "userID" func dummyChunk(now model.Time) Chunk { return dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", }) } -func dummyChunkForEncoding(now model.Time, metric model.Metric, encoding chunk.Encoding, samples int) Chunk { - c, _ := chunk.NewForEncoding(encoding) +func dummyChunkForEncoding(now model.Time, metric model.Metric, enc encoding.Encoding, samples int) Chunk { + c, _ := encoding.NewForEncoding(enc) for i := 0; i < samples; i++ { t := time.Duration(i) * 15 * time.Second cs, err := c.Add(model.SamplePair{Timestamp: now.Add(t), Value: 0}) @@ -51,7 +51,7 @@ func dummyChunkForEncoding(now model.Time, metric model.Metric, encoding chunk.E } func dummyChunkFor(now model.Time, metric model.Metric) Chunk { - return dummyChunkForEncoding(now, metric, chunk.Varbit, 1) + return dummyChunkForEncoding(now, metric, encoding.Varbit, 1) } func TestChunkCodec(t *testing.T) { @@ -150,8 +150,8 @@ func TestChunksToMatrix(t *testing.T) { // Create 2 chunks which have the same metric metric := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } now := model.Now() chunk1 := dummyChunkFor(now, metric) @@ -169,8 +169,8 @@ func TestChunksToMatrix(t *testing.T) { // Create another chunk with a different metric otherMetric := model.Metric{ model.MetricNameLabel: "foo2", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } chunk3 := dummyChunkFor(now, otherMetric) chunk3Samples, err := chunk3.Samples(chunk3.From, chunk3.Through) diff --git a/pkg/prom1/storage/local/chunk/bigchunk.go b/pkg/chunk/encoding/bigchunk.go similarity index 99% rename from pkg/prom1/storage/local/chunk/bigchunk.go rename to pkg/chunk/encoding/bigchunk.go index ad272b7d2e0..3ae3b9ddc9e 100644 --- a/pkg/prom1/storage/local/chunk/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -1,4 +1,4 @@ -package chunk +package encoding import ( "bytes" diff --git a/pkg/prom1/storage/local/chunk/bigchunk_test.go b/pkg/chunk/encoding/bigchunk_test.go similarity index 99% rename from pkg/prom1/storage/local/chunk/bigchunk_test.go rename to pkg/chunk/encoding/bigchunk_test.go index 23c40468962..6c3bd432eaa 100644 --- a/pkg/prom1/storage/local/chunk/bigchunk_test.go +++ b/pkg/chunk/encoding/bigchunk_test.go @@ -1,4 +1,4 @@ -package chunk +package encoding import ( "bytes" diff --git a/pkg/prom1/storage/local/chunk/chunk.go b/pkg/chunk/encoding/chunk.go similarity index 99% rename from pkg/prom1/storage/local/chunk/chunk.go rename to pkg/chunk/encoding/chunk.go index 679450e5a92..3ed281af2f3 100644 --- a/pkg/prom1/storage/local/chunk/chunk.go +++ b/pkg/chunk/encoding/chunk.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "errors" diff --git a/pkg/prom1/storage/local/chunk/chunk_test.go b/pkg/chunk/encoding/chunk_test.go similarity index 99% rename from pkg/prom1/storage/local/chunk/chunk_test.go rename to pkg/chunk/encoding/chunk_test.go index d96bb99d972..b4a3acf7313 100644 --- a/pkg/prom1/storage/local/chunk/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -15,7 +15,7 @@ // it may make sense to split those out later, but given that the tests are // near-identical and share a helper, this feels simpler for now. -package chunk +package encoding import ( "bytes" diff --git a/pkg/prom1/storage/local/chunk/delta.go b/pkg/chunk/encoding/delta.go similarity index 99% rename from pkg/prom1/storage/local/chunk/delta.go rename to pkg/chunk/encoding/delta.go index c28528d4a18..dfe4659d1d5 100644 --- a/pkg/prom1/storage/local/chunk/delta.go +++ b/pkg/chunk/encoding/delta.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "encoding/binary" diff --git a/pkg/prom1/storage/local/chunk/delta_helpers.go b/pkg/chunk/encoding/delta_helpers.go similarity index 99% rename from pkg/prom1/storage/local/chunk/delta_helpers.go rename to pkg/chunk/encoding/delta_helpers.go index 7b64db53f06..c34ab8555a3 100644 --- a/pkg/prom1/storage/local/chunk/delta_helpers.go +++ b/pkg/chunk/encoding/delta_helpers.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "math" diff --git a/pkg/prom1/storage/local/chunk/delta_test.go b/pkg/chunk/encoding/delta_test.go similarity index 99% rename from pkg/prom1/storage/local/chunk/delta_test.go rename to pkg/chunk/encoding/delta_test.go index 0e95814d7e4..01700861dc3 100644 --- a/pkg/prom1/storage/local/chunk/delta_test.go +++ b/pkg/chunk/encoding/delta_test.go @@ -18,7 +18,7 @@ // it may make sense to split those out later, but given that the tests are // near-identical and share a helper, this feels simpler for now. -package chunk +package encoding import ( "bytes" diff --git a/pkg/prom1/storage/local/chunk/doubledelta.go b/pkg/chunk/encoding/doubledelta.go similarity index 99% rename from pkg/prom1/storage/local/chunk/doubledelta.go rename to pkg/chunk/encoding/doubledelta.go index e9ab14e7969..59d9f95da2b 100644 --- a/pkg/prom1/storage/local/chunk/doubledelta.go +++ b/pkg/chunk/encoding/doubledelta.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "encoding/binary" diff --git a/pkg/prom1/storage/local/chunk/instrumentation.go b/pkg/chunk/encoding/instrumentation.go similarity index 99% rename from pkg/prom1/storage/local/chunk/instrumentation.go rename to pkg/chunk/encoding/instrumentation.go index c28c9950cec..241b88c48bc 100644 --- a/pkg/prom1/storage/local/chunk/instrumentation.go +++ b/pkg/chunk/encoding/instrumentation.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/prom1/storage/local/chunk/varbit.go b/pkg/chunk/encoding/varbit.go similarity index 99% rename from pkg/prom1/storage/local/chunk/varbit.go rename to pkg/chunk/encoding/varbit.go index 579eebb4347..91fc3eaad47 100644 --- a/pkg/prom1/storage/local/chunk/varbit.go +++ b/pkg/chunk/encoding/varbit.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import ( "encoding/binary" diff --git a/pkg/prom1/storage/local/chunk/varbit_helpers.go b/pkg/chunk/encoding/varbit_helpers.go similarity index 99% rename from pkg/prom1/storage/local/chunk/varbit_helpers.go rename to pkg/chunk/encoding/varbit_helpers.go index 750979404f4..9ca639e9421 100644 --- a/pkg/prom1/storage/local/chunk/varbit_helpers.go +++ b/pkg/chunk/encoding/varbit_helpers.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import "github.com/prometheus/common/model" diff --git a/pkg/prom1/storage/local/chunk/varbit_test.go b/pkg/chunk/encoding/varbit_test.go similarity index 98% rename from pkg/prom1/storage/local/chunk/varbit_test.go rename to pkg/chunk/encoding/varbit_test.go index c0c1084878e..d5f0cd7eeaf 100644 --- a/pkg/prom1/storage/local/chunk/varbit_test.go +++ b/pkg/chunk/encoding/varbit_test.go @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunk +package encoding import "testing" diff --git a/pkg/chunk/testutils/testutils.go b/pkg/chunk/testutils/testutils.go index 8d0f0e102b3..ae1da7a3aad 100644 --- a/pkg/chunk/testutils/testutils.go +++ b/pkg/chunk/testutils/testutils.go @@ -5,7 +5,7 @@ import ( "strconv" "time" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" "github.com/cortexproject/cortex/pkg/chunk" @@ -70,8 +70,8 @@ func CreateChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { func dummyChunk(now model.Time) chunk.Chunk { return dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", }) } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 6f8cb951d4f..bdb09052502 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -20,8 +20,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" @@ -470,7 +470,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest continue } - c := chunk.New() + c := encoding.New() for _, sample := range ts.Samples { cs, err := c.Add(model.SamplePair{ Timestamp: model.Time(sample.TimestampMs), diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index c04a0d8f8bc..02be577082b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -12,7 +12,7 @@ import ( old_ctx "golang.org/x/net/context" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -156,7 +156,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c cfg.ingesterClientFactory = client.MakeIngesterClient } - if err := chunk.DefaultEncoding.Set(cfg.ChunkEncoding); err != nil { + if err := encoding.DefaultEncoding.Set(cfg.ChunkEncoding); err != nil { return nil, err } diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 3136cd0575c..f7958a23e50 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -7,7 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" ) @@ -90,7 +90,7 @@ func (s *memorySeries) add(v model.SamplePair) error { } if len(s.chunkDescs) == 0 || s.headChunkClosed { - newHead := newDesc(chunk.New(), v.Timestamp, v.Timestamp) + newHead := newDesc(encoding.New(), v.Timestamp, v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkClosed = false createdChunks.Inc() @@ -125,7 +125,7 @@ func (s *memorySeries) add(v model.SamplePair) error { return nil } -func firstAndLastTimes(c chunk.Chunk) (model.Time, model.Time, error) { +func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) { var ( first model.Time last model.Time @@ -191,7 +191,7 @@ func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.Sample } for idx := fromIdx; idx <= throughIdx; idx++ { cd := s.chunkDescs[idx] - chValues, err := chunk.RangeValues(cd.C.NewIterator(), in) + chValues, err := encoding.RangeValues(cd.C.NewIterator(), in) if err != nil { return nil, err } @@ -213,14 +213,14 @@ func (s *memorySeries) setChunks(descs []*desc) error { } type desc struct { - C chunk.Chunk // nil if chunk is evicted. - FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. - LastTime model.Time // Timestamp of last sample. Populated at creation & on append. - LastUpdate model.Time // This server's local time on last change - flushed bool // set to true when flush succeeds + C encoding.Chunk // nil if chunk is evicted. + FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. + LastTime model.Time // Timestamp of last sample. Populated at creation & on append. + LastUpdate model.Time // This server's local time on last change + flushed bool // set to true when flush succeeds } -func newDesc(c chunk.Chunk, firstTime model.Time, lastTime model.Time) *desc { +func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc { return &desc{ C: c, FirstTime: firstTime, @@ -232,7 +232,7 @@ func newDesc(c chunk.Chunk, firstTime model.Time, lastTime model.Time) *desc { // Add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (d *desc) add(s model.SamplePair) ([]chunk.Chunk, error) { +func (d *desc) add(s model.SamplePair) ([]encoding.Chunk, error) { cs, err := d.C.Add(s) if err != nil { return nil, err diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index d245e62e073..ae241aba74b 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -12,8 +12,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util" "github.com/weaveworks/common/user" @@ -149,7 +149,7 @@ func toWireChunks(descs []*desc) ([]client.Chunk, error) { Encoding: int32(d.C.Encoding()), } - buf := bytes.NewBuffer(make([]byte, 0, chunk.ChunkLen)) + buf := bytes.NewBuffer(make([]byte, 0, encoding.ChunkLen)) if err := d.C.Marshal(buf); err != nil { return nil, err } @@ -170,7 +170,7 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { } var err error - desc.C, err = chunk.NewForEncoding(chunk.Encoding(byte(c.Encoding))) + desc.C, err = encoding.NewForEncoding(encoding.Encoding(byte(c.Encoding))) if err != nil { return nil, err } diff --git a/pkg/querier/batch/batch.go b/pkg/querier/batch/batch.go index bfc4444ae2f..480b869ddcd 100644 --- a/pkg/querier/batch/batch.go +++ b/pkg/querier/batch/batch.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" ) diff --git a/pkg/querier/batch/chunk.go b/pkg/querier/batch/chunk.go index 2ae3ab955b7..007f2cf60aa 100644 --- a/pkg/querier/batch/chunk.go +++ b/pkg/querier/batch/chunk.go @@ -2,7 +2,7 @@ package batch import ( "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index cf46e895c81..2ff48d46da5 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) const ( diff --git a/pkg/querier/batch/merge.go b/pkg/querier/batch/merge.go index 7a3637b6c59..6823de5c170 100644 --- a/pkg/querier/batch/merge.go +++ b/pkg/querier/batch/merge.go @@ -5,7 +5,7 @@ import ( "sort" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) type mergeIterator struct { diff --git a/pkg/querier/batch/non_overlapping.go b/pkg/querier/batch/non_overlapping.go index 2a2eb8e0e16..075ce7b049c 100644 --- a/pkg/querier/batch/non_overlapping.go +++ b/pkg/querier/batch/non_overlapping.go @@ -2,7 +2,7 @@ package batch import ( "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) const bufferBatches = 4 diff --git a/pkg/querier/batch/stream.go b/pkg/querier/batch/stream.go index 728544c661b..3a1946aefb5 100644 --- a/pkg/querier/batch/stream.go +++ b/pkg/querier/batch/stream.go @@ -3,7 +3,7 @@ package batch import ( "fmt" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) // batchStream deals with iteratoring through multiple, non-overlapping batches, diff --git a/pkg/querier/batch/stream_test.go b/pkg/querier/batch/stream_test.go index dc582c8ed89..ce379e2e160 100644 --- a/pkg/querier/batch/stream_test.go +++ b/pkg/querier/batch/stream_test.go @@ -4,7 +4,7 @@ import ( "strconv" "testing" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/stretchr/testify/require" ) diff --git a/pkg/querier/chunk_store_queryable_test.go b/pkg/querier/chunk_store_queryable_test.go index 243656ba618..e09a0ecf1db 100644 --- a/pkg/querier/chunk_store_queryable_test.go +++ b/pkg/querier/chunk_store_queryable_test.go @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) func TestChunkQueryable(t *testing.T) { diff --git a/pkg/querier/iterators/chunk_iterator.go b/pkg/querier/iterators/chunk_iterator.go index 8bfdfcbda4d..6971487375a 100644 --- a/pkg/querier/iterators/chunk_iterator.go +++ b/pkg/querier/iterators/chunk_iterator.go @@ -2,7 +2,7 @@ package iterators import ( "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) diff --git a/pkg/querier/iterators/chunk_merge_iterator_test.go b/pkg/querier/iterators/chunk_merge_iterator_test.go index f23684f19d7..074d1ba8237 100644 --- a/pkg/querier/iterators/chunk_merge_iterator_test.go +++ b/pkg/querier/iterators/chunk_merge_iterator_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" ) const ( diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index f55c10db87e..8369fd572f0 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -14,8 +14,8 @@ import ( "github.com/stretchr/testify/require" "github.com/cortexproject/cortex/pkg/chunk" + promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - promchunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/querier/iterators" "github.com/cortexproject/cortex/pkg/util" diff --git a/pkg/util/chunkcompat/compat.go b/pkg/util/chunkcompat/compat.go index 8a2c7179b73..446f7a4544d 100644 --- a/pkg/util/chunkcompat/compat.go +++ b/pkg/util/chunkcompat/compat.go @@ -6,8 +6,8 @@ import ( "github.com/prometheus/common/model" "github.com/cortexproject/cortex/pkg/chunk" + prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/ingester/client" - prom_chunk "github.com/cortexproject/cortex/pkg/prom1/storage/local/chunk" "github.com/cortexproject/cortex/pkg/util" ) From 7958330ee771b10bf7bdd1ebcbd6c4072284b441 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 9 Nov 2018 12:05:57 +0000 Subject: [PATCH 09/11] gofmt 1.10 Signed-off-by: Tom Wilkie --- pkg/chunk/cache/cache_test.go | 2 +- pkg/chunk/chunk_store_test.go | 32 ++++++++++++++++---------------- pkg/chunk/chunk_test.go | 12 ++++++------ pkg/chunk/testutils/testutils.go | 4 ++-- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pkg/chunk/cache/cache_test.go b/pkg/chunk/cache/cache_test.go index d4753dbe902..b87fb83921b 100644 --- a/pkg/chunk/cache/cache_test.go +++ b/pkg/chunk/cache/cache_test.go @@ -37,7 +37,7 @@ func fillCache(t *testing.T, cache cache.Cache) ([]string, []chunk.Chunk) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, promChunk[0], ts, diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index a2de276d6fc..d123b39be23 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -135,25 +135,25 @@ func TestChunkStore_Get(t *testing.T) { fooMetric1 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", } fooMetric2 := model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", } // barMetric1 is a subset of barMetric2 to test over-matching bug. barMetric1 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", + "bar": "baz", } barMetric2 := model.Metric{ model.MetricNameLabel: "bar", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } fooChunk1 := dummyChunkFor(now, fooMetric1) @@ -315,14 +315,14 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { now := model.Now() chunk1 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", - "flip": "flop", + "bar": "baz", + "toms": "code", + "flip": "flop", }) chunk2 := dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "beep", - "toms": "code", + "bar": "beep", + "toms": "code", }) for _, tc := range []struct { @@ -424,7 +424,7 @@ func TestChunkStoreRandom(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -488,7 +488,7 @@ func TestChunkStoreLeastRead(t *testing.T) { model.Fingerprint(1), model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", }, chunks[0], ts, @@ -534,7 +534,7 @@ func TestIndexCachingWorks(t *testing.T) { ctx := user.InjectOrgID(context.Background(), userID) metric := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", + "bar": "baz", } storeMaker := stores[1] storeCfg := storeMaker.configFn() diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index 4db4aec1b19..a396a665074 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -19,8 +19,8 @@ const userID = "userID" func dummyChunk(now model.Time) Chunk { return dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", }) } @@ -150,8 +150,8 @@ func TestChunksToMatrix(t *testing.T) { // Create 2 chunks which have the same metric metric := model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } now := model.Now() chunk1 := dummyChunkFor(now, metric) @@ -169,8 +169,8 @@ func TestChunksToMatrix(t *testing.T) { // Create another chunk with a different metric otherMetric := model.Metric{ model.MetricNameLabel: "foo2", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", } chunk3 := dummyChunkFor(now, otherMetric) chunk3Samples, err := chunk3.Samples(chunk3.From, chunk3.Through) diff --git a/pkg/chunk/testutils/testutils.go b/pkg/chunk/testutils/testutils.go index ae1da7a3aad..bcbe7b07084 100644 --- a/pkg/chunk/testutils/testutils.go +++ b/pkg/chunk/testutils/testutils.go @@ -70,8 +70,8 @@ func CreateChunks(startIndex, batchSize int) ([]string, []chunk.Chunk, error) { func dummyChunk(now model.Time) chunk.Chunk { return dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", - "bar": "baz", - "toms": "code", + "bar": "baz", + "toms": "code", }) } From a0bf693913d744f69e10d3d3885e3d15557593ee Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 9 Nov 2018 17:46:36 +0000 Subject: [PATCH 10/11] Go back to 120 samples per subchunk, due to increase memory usage. Signed-off-by: Tom Wilkie --- pkg/chunk/encoding/bigchunk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go index 3ae3b9ddc9e..833d307f690 100644 --- a/pkg/chunk/encoding/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -11,7 +11,7 @@ import ( "github.com/prometheus/tsdb/chunkenc" ) -const samplesPerChunk = 60 +const samplesPerChunk = 120 var errOutOfBounds = errors.New("out of bounds") From 53af116b2f6f899e43aa55d79c75d5f4ff086c18 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 19 Nov 2018 13:03:28 +0000 Subject: [PATCH 11/11] Review feedback. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk.go | 2 +- pkg/chunk/encoding/bigchunk.go | 3 +- pkg/querier/batch/chunk_test.go | 26 ++++++++---- pkg/querier/batch/merge_test.go | 49 +++++++++++++---------- pkg/querier/batch/non_overlapping_test.go | 37 +++++++++-------- 5 files changed, 70 insertions(+), 47 deletions(-) diff --git a/pkg/chunk/chunk.go b/pkg/chunk/chunk.go index e2e604a38ed..78de06218be 100644 --- a/pkg/chunk/chunk.go +++ b/pkg/chunk/chunk.go @@ -319,7 +319,7 @@ func (c *Chunk) Decode(decodeContext *DecodeContext, input []byte) error { c.encoded = input remainingData := input[len(input)-r.Len():] - if int(dataLen) > len(remainingData) { + if int(dataLen) != len(remainingData) { return ErrDataLength } diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go index 833d307f690..50e8a318bec 100644 --- a/pkg/chunk/encoding/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -45,7 +45,8 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) { // addNextChunk adds a new XOR "subchunk" to the internal list of chunks. func (b *bigchunk) addNextChunk(start model.Time) error { - // To save memory, we "compact" the last chunk. + // To save memory, we "compact" the previous chunk - the array backing the slice + // will be upto 2x too big, and we can save this space. if l := len(b.chunks); l > 0 { c := b.chunks[l-1] buf := make([]byte, len(c.Bytes())) diff --git a/pkg/querier/batch/chunk_test.go b/pkg/querier/batch/chunk_test.go index 2ff48d46da5..3bdcbebb03f 100644 --- a/pkg/querier/batch/chunk_test.go +++ b/pkg/querier/batch/chunk_test.go @@ -20,18 +20,30 @@ const ( ) func TestChunkIter(t *testing.T) { - chunk := mkChunk(t, 0, 100) - iter := &chunkIterator{} - iter.reset(chunk) - testIter(t, 100, newIteratorAdapter(iter)) - testSeek(t, 100, newIteratorAdapter(iter)) + forEncodings(t, func(t *testing.T, enc promchunk.Encoding) { + chunk := mkChunk(t, 0, 100, enc) + iter := &chunkIterator{} + iter.reset(chunk) + testIter(t, 100, newIteratorAdapter(iter)) + testSeek(t, 100, newIteratorAdapter(iter)) + }) } -func mkChunk(t require.TestingT, from model.Time, points int) chunk.Chunk { +func forEncodings(t *testing.T, f func(t *testing.T, enc promchunk.Encoding)) { + for _, enc := range []promchunk.Encoding{ + promchunk.DoubleDelta, promchunk.Varbit, promchunk.Bigchunk, + } { + t.Run(enc.String(), func(t *testing.T) { + f(t, enc) + }) + } +} + +func mkChunk(t require.TestingT, from model.Time, points int, enc promchunk.Encoding) chunk.Chunk { metric := model.Metric{ model.MetricNameLabel: "foo", } - pc, err := promchunk.NewForEncoding(promchunk.Bigchunk) + pc, err := promchunk.NewForEncoding(enc) require.NoError(t, err) ts := from for i := 0; i < points; i++ { diff --git a/pkg/querier/batch/merge_test.go b/pkg/querier/batch/merge_test.go index dc47a10c2bf..0595cf3100d 100644 --- a/pkg/querier/batch/merge_test.go +++ b/pkg/querier/batch/merge_test.go @@ -5,33 +5,38 @@ import ( "time" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) func TestMergeIter(t *testing.T) { - chunk1 := mkChunk(t, 0, 100) - chunk2 := mkChunk(t, model.TimeFromUnix(25), 100) - chunk3 := mkChunk(t, model.TimeFromUnix(50), 100) - chunk4 := mkChunk(t, model.TimeFromUnix(75), 100) - chunk5 := mkChunk(t, model.TimeFromUnix(100), 100) - iter := newMergeIterator([]chunk.Chunk{chunk1, chunk2, chunk3, chunk4, chunk5}) - testIter(t, 200, newIteratorAdapter(iter)) - testSeek(t, 200, newIteratorAdapter(iter)) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + chunk1 := mkChunk(t, 0, 100, enc) + chunk2 := mkChunk(t, model.TimeFromUnix(25), 100, enc) + chunk3 := mkChunk(t, model.TimeFromUnix(50), 100, enc) + chunk4 := mkChunk(t, model.TimeFromUnix(75), 100, enc) + chunk5 := mkChunk(t, model.TimeFromUnix(100), 100, enc) + iter := newMergeIterator([]chunk.Chunk{chunk1, chunk2, chunk3, chunk4, chunk5}) + testIter(t, 200, newIteratorAdapter(iter)) + testSeek(t, 200, newIteratorAdapter(iter)) + }) } func TestMergeHarder(t *testing.T) { - var ( - numChunks = 24 * 15 - chunks = make([]chunk.Chunk, 0) - from = model.Time(0) - offset = 30 - samples = 100 - ) - for i := 0; i < numChunks; i++ { - chunks = append(chunks, mkChunk(t, from, samples)) - from = from.Add(time.Duration(offset) * time.Second) - } - iter := newMergeIterator(chunks) - testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) - testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + var ( + numChunks = 24 * 15 + chunks = make([]chunk.Chunk, 0) + from = model.Time(0) + offset = 30 + samples = 100 + ) + for i := 0; i < numChunks; i++ { + chunks = append(chunks, mkChunk(t, from, samples, enc)) + from = from.Add(time.Duration(offset) * time.Second) + } + iter := newMergeIterator(chunks) + testIter(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + testSeek(t, offset*numChunks+samples-offset, newIteratorAdapter(iter)) + }) } diff --git a/pkg/querier/batch/non_overlapping_test.go b/pkg/querier/batch/non_overlapping_test.go index 297a35ce011..0af38c68eb6 100644 --- a/pkg/querier/batch/non_overlapping_test.go +++ b/pkg/querier/batch/non_overlapping_test.go @@ -4,27 +4,32 @@ import ( "testing" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/prometheus/common/model" ) func TestNonOverlappingIter(t *testing.T) { - cs := []chunk.Chunk{} - for i := int64(0); i < 100; i++ { - cs = append(cs, mkChunk(t, model.TimeFromUnix(i*10), 10)) - } - testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + cs := []chunk.Chunk{} + for i := int64(0); i < 100; i++ { + cs = append(cs, mkChunk(t, model.TimeFromUnix(i*10), 10, enc)) + } + testIter(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testSeek(t, 10*100, newIteratorAdapter(newNonOverlappingIterator(cs))) + }) } func TestNonOverlappingIterSparse(t *testing.T) { - cs := []chunk.Chunk{ - mkChunk(t, model.TimeFromUnix(0), 1), - mkChunk(t, model.TimeFromUnix(1), 3), - mkChunk(t, model.TimeFromUnix(4), 1), - mkChunk(t, model.TimeFromUnix(5), 90), - mkChunk(t, model.TimeFromUnix(95), 1), - mkChunk(t, model.TimeFromUnix(96), 4), - } - testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) - testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + forEncodings(t, func(t *testing.T, enc encoding.Encoding) { + cs := []chunk.Chunk{ + mkChunk(t, model.TimeFromUnix(0), 1, enc), + mkChunk(t, model.TimeFromUnix(1), 3, enc), + mkChunk(t, model.TimeFromUnix(4), 1, enc), + mkChunk(t, model.TimeFromUnix(5), 90, enc), + mkChunk(t, model.TimeFromUnix(95), 1, enc), + mkChunk(t, model.TimeFromUnix(96), 4, enc), + } + testIter(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + testSeek(t, 100, newIteratorAdapter(newNonOverlappingIterator(cs))) + }) }