Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/encoding"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -47,6 +48,7 @@ func main() {
ingesterConfig ingester.Config
preallocConfig client.PreallocConfig
clientConfig client.Config
marshalConfig encoding.MarshalConfig
limits validation.Limits
eventSampleRate int
maxStreams uint
Expand All @@ -60,7 +62,7 @@ func main() {
ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort

util.RegisterFlags(&serverConfig, &chunkStoreConfig, &storageConfig,
&schemaConfig, &ingesterConfig, &clientConfig, &limits, &preallocConfig)
&schemaConfig, &ingesterConfig, &clientConfig, &limits, &preallocConfig, &marshalConfig)
flag.UintVar(&maxStreams, "ingester.max-concurrent-streams", 1000, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)")
flag.IntVar(&eventSampleRate, "event.sample-rate", 0, "How often to sample observability events (0 = never).")
flag.Parse()
Expand Down
4 changes: 4 additions & 0 deletions pkg/chunk/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (

const userID = "userID"

func init() {
encoding.DefaultEncoding = encoding.Varbit
}

func dummyChunk(now model.Time) Chunk {
return dummyChunkFor(now, model.Metric{
model.MetricNameLabel: "foo",
Expand Down
9 changes: 0 additions & 9 deletions pkg/chunk/encoding/bigchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"io"
"io/ioutil"

"github.com/prometheus/common/model"
"github.com/prometheus/tsdb/chunkenc"
Expand Down Expand Up @@ -95,14 +94,6 @@ func (b *bigchunk) MarshalToBuf(buf []byte) error {
return b.Marshal(writer)
}

func (b *bigchunk) Unmarshal(r io.Reader) error {
buf, err := ioutil.ReadAll(r)
if err != nil {
return err
}
return b.UnmarshalFromBuf(buf)
}

func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
r := reader{buf: buf}
numChunks, err := r.ReadUint16()
Expand Down
1 change: 0 additions & 1 deletion pkg/chunk/encoding/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type Chunk interface {
Add(sample model.SamplePair) ([]Chunk, error)
NewIterator() Iterator
Marshal(io.Writer) error
Unmarshal(io.Reader) error
UnmarshalFromBuf([]byte) error
Encoding() Encoding
Utilization() float64
Expand Down
6 changes: 4 additions & 2 deletions pkg/chunk/encoding/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestLen(t *testing.T) {
var step = int(15 * time.Second / time.Millisecond)

func TestChunk(t *testing.T) {
alwaysMarshalFullsizeChunks = false
for _, tc := range []struct {
encoding Encoding
maxSamples int
Expand Down Expand Up @@ -112,7 +113,7 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {

bs1 := buf.Bytes()
chunk, err = NewForEncoding(encoding)
err = chunk.Unmarshal(&buf)
err = chunk.UnmarshalFromBuf(bs1)
require.NoError(t, err)

// Check all the samples are in there.
Expand All @@ -127,11 +128,12 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) {
require.NoError(t, iter.Err())

// Check the byte representation after another Marshall is the same.
buf = bytes.Buffer{}
err = chunk.Marshal(&buf)
require.NoError(t, err)
bs2 := buf.Bytes()

require.True(t, bytes.Equal(bs1, bs2))
require.Equal(t, bs1, bs2)
}

// testChunkSeek checks seek works as expected.
Expand Down
9 changes: 0 additions & 9 deletions pkg/chunk/encoding/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,6 @@ func (c deltaEncodedChunk) Marshal(w io.Writer) error {
return nil
}

// Unmarshal implements chunk.
func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
return c.setLen()
}

// UnmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
Expand Down
6 changes: 0 additions & 6 deletions pkg/chunk/encoding/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,6 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", "invalid number of time bytes")

err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "invalid number of time bytes")

// Fix the corruption to go on.
buf[c.timeBytesPos] = byte(d1)

Expand All @@ -111,9 +108,6 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {

err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", "header size")

err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "header size")
}
}
}
9 changes: 0 additions & 9 deletions pkg/chunk/encoding/doubledelta.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,6 @@ func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
return nil
}

// Unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
}
return c.setLen()
}

// UnmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
Expand Down
46 changes: 34 additions & 12 deletions pkg/chunk/encoding/varbit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package encoding

import (
"encoding/binary"
"flag"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -286,26 +287,21 @@ func (c *varbitChunk) Slice(_, _ model.Time) Chunk {

// Marshal implements chunk.
func (c varbitChunk) Marshal(w io.Writer) error {
n, err := w.Write(c)
size := c.marshalLen()
n, err := w.Write(c[:size])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
if n != size {
return fmt.Errorf("wanted to write %d bytes, wrote %d", size, n)
}
return nil
}

// Unmarshal implements chunk.
func (c varbitChunk) Unmarshal(r io.Reader) error {
_, err := io.ReadFull(r, c)
return err
}

// UnmarshalFromBuf implements chunk.
func (c varbitChunk) UnmarshalFromBuf(buf []byte) error {
if copied := copy(c, buf); copied != cap(c) {
return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied)
if copied := copy(c, buf); copied != cap(c) && copied != c.marshalLen() {
return fmt.Errorf("incorrect byte count copied from buffer during unmarshaling, want %d or %d, got %d", c.marshalLen(), ChunkLen, copied)
}
return nil
}
Expand All @@ -319,6 +315,32 @@ func (c varbitChunk) Utilization() float64 {
return math.Min(float64(c.nextSampleOffset()/8+15)/float64(cap(c)), 1)
}

// MarshalConfig configures the behaviour of marshalling
type MarshalConfig struct{}

var alwaysMarshalFullsizeChunks = true

// RegisterFlags registers configuration settings.
func (MarshalConfig) RegisterFlags(f *flag.FlagSet) {
flag.BoolVar(&alwaysMarshalFullsizeChunks, "store.fullsize-chunks", alwaysMarshalFullsizeChunks, "When saving varbit chunks, pad to 1024 bytes")
}

// marshalLen returns the number of bytes that should be marshalled for this chunk
func (c varbitChunk) marshalLen() int {
if alwaysMarshalFullsizeChunks {
return cap(c)
}
bits := c.nextSampleOffset()
if bits < varbitThirdSampleBitOffset {
bits = varbitThirdSampleBitOffset
}
bytes := int(bits)/8 + 1
if bytes > len(c) {
bytes = len(c)
}
return bytes
}

// Len implements chunk. Runs in O(n).
func (c varbitChunk) Len() int {
it := c.NewIterator()
Expand All @@ -329,7 +351,7 @@ func (c varbitChunk) Len() int {
}

func (c varbitChunk) Size() int {
return len(c)
return c.marshalLen()
}

func (c varbitChunk) firstTime() model.Time {
Expand Down