From 74cd091140658d2294d680287955f54a476019dc Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Sat, 29 Sep 2018 11:13:08 +0000 Subject: [PATCH 1/3] Write only as many chunk bytes as needed - Only effective for varbit chunks at present. - Also allow undersized varbit chunks to be unmarshalled. - Use varbit encoding in chunk tests, since that's what we use most commonly in production - Remove chunk.Unmarshal(io.Reader), its only used in tests. Signed-off-by: Tom Wilkie --- pkg/chunk/chunk_test.go | 4 ++++ pkg/chunk/encoding/bigchunk.go | 9 --------- pkg/chunk/encoding/chunk.go | 1 - pkg/chunk/encoding/chunk_test.go | 5 +++-- pkg/chunk/encoding/delta.go | 9 --------- pkg/chunk/encoding/delta_test.go | 6 ------ pkg/chunk/encoding/doubledelta.go | 9 --------- pkg/chunk/encoding/varbit.go | 30 +++++++++++++++++++----------- 8 files changed, 26 insertions(+), 47 deletions(-) diff --git a/pkg/chunk/chunk_test.go b/pkg/chunk/chunk_test.go index a396a665074..5324719a241 100644 --- a/pkg/chunk/chunk_test.go +++ b/pkg/chunk/chunk_test.go @@ -16,6 +16,10 @@ import ( const userID = "userID" +func init() { + encoding.DefaultEncoding = encoding.Varbit +} + func dummyChunk(now model.Time) Chunk { return dummyChunkFor(now, model.Metric{ model.MetricNameLabel: "foo", diff --git a/pkg/chunk/encoding/bigchunk.go b/pkg/chunk/encoding/bigchunk.go index 50e8a318bec..5be2372ad64 100644 --- a/pkg/chunk/encoding/bigchunk.go +++ b/pkg/chunk/encoding/bigchunk.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "io" - "io/ioutil" "github.com/prometheus/common/model" "github.com/prometheus/tsdb/chunkenc" @@ -95,14 +94,6 @@ func (b *bigchunk) MarshalToBuf(buf []byte) error { return b.Marshal(writer) } -func (b *bigchunk) Unmarshal(r io.Reader) error { - buf, err := ioutil.ReadAll(r) - if err != nil { - return err - } - return b.UnmarshalFromBuf(buf) -} - func (b *bigchunk) UnmarshalFromBuf(buf []byte) error { r := reader{buf: buf} numChunks, err := r.ReadUint16() diff --git a/pkg/chunk/encoding/chunk.go b/pkg/chunk/encoding/chunk.go index 3ed281af2f3..1260adf619f 100644 --- a/pkg/chunk/encoding/chunk.go +++ b/pkg/chunk/encoding/chunk.go @@ -86,7 +86,6 @@ type Chunk interface { Add(sample model.SamplePair) ([]Chunk, error) NewIterator() Iterator Marshal(io.Writer) error - Unmarshal(io.Reader) error UnmarshalFromBuf([]byte) error Encoding() Encoding Utilization() float64 diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go index b4a3acf7313..6266c99815b 100644 --- a/pkg/chunk/encoding/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -112,7 +112,7 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) { bs1 := buf.Bytes() chunk, err = NewForEncoding(encoding) - err = chunk.Unmarshal(&buf) + err = chunk.UnmarshalFromBuf(bs1) require.NoError(t, err) // Check all the samples are in there. @@ -127,11 +127,12 @@ func testChunkEncoding(t *testing.T, encoding Encoding, samples int) { require.NoError(t, iter.Err()) // Check the byte representation after another Marshall is the same. + buf = bytes.Buffer{} err = chunk.Marshal(&buf) require.NoError(t, err) bs2 := buf.Bytes() - require.True(t, bytes.Equal(bs1, bs2)) + require.Equal(t, bs1, bs2) } // testChunkSeek checks seek works as expected. diff --git a/pkg/chunk/encoding/delta.go b/pkg/chunk/encoding/delta.go index dfe4659d1d5..2b7a9c76d2c 100644 --- a/pkg/chunk/encoding/delta.go +++ b/pkg/chunk/encoding/delta.go @@ -213,15 +213,6 @@ func (c deltaEncodedChunk) Marshal(w io.Writer) error { return nil } -// Unmarshal implements chunk. -func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error { - *c = (*c)[:cap(*c)] - if _, err := io.ReadFull(r, *c); err != nil { - return err - } - return c.setLen() -} - // UnmarshalFromBuf implements chunk. func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] diff --git a/pkg/chunk/encoding/delta_test.go b/pkg/chunk/encoding/delta_test.go index 01700861dc3..6363a5df2e2 100644 --- a/pkg/chunk/encoding/delta_test.go +++ b/pkg/chunk/encoding/delta_test.go @@ -99,9 +99,6 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { err = cs[0].UnmarshalFromBuf(buf) verifyUnmarshallingError(err, c.chunkTypeName, "buf", "invalid number of time bytes") - err = cs[0].Unmarshal(bytes.NewBuffer(buf)) - verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "invalid number of time bytes") - // Fix the corruption to go on. buf[c.timeBytesPos] = byte(d1) @@ -111,9 +108,6 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { err = cs[0].UnmarshalFromBuf(buf) verifyUnmarshallingError(err, c.chunkTypeName, "buf", "header size") - - err = cs[0].Unmarshal(bytes.NewBuffer(buf)) - verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "header size") } } } diff --git a/pkg/chunk/encoding/doubledelta.go b/pkg/chunk/encoding/doubledelta.go index 59d9f95da2b..8e7852eef3d 100644 --- a/pkg/chunk/encoding/doubledelta.go +++ b/pkg/chunk/encoding/doubledelta.go @@ -241,15 +241,6 @@ func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error { return nil } -// Unmarshal implements chunk. -func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error { - *c = (*c)[:cap(*c)] - if _, err := io.ReadFull(r, *c); err != nil { - return err - } - return c.setLen() -} - // UnmarshalFromBuf implements chunk. func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] diff --git a/pkg/chunk/encoding/varbit.go b/pkg/chunk/encoding/varbit.go index 91fc3eaad47..c7ed63f1c3a 100644 --- a/pkg/chunk/encoding/varbit.go +++ b/pkg/chunk/encoding/varbit.go @@ -286,26 +286,21 @@ func (c *varbitChunk) Slice(_, _ model.Time) Chunk { // Marshal implements chunk. func (c varbitChunk) Marshal(w io.Writer) error { - n, err := w.Write(c) + size := c.MarshalLen() + n, err := w.Write(c[:size]) if err != nil { return err } - if n != cap(c) { - return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n) + if n != size { + return fmt.Errorf("wanted to write %d bytes, wrote %d", size, n) } return nil } -// Unmarshal implements chunk. -func (c varbitChunk) Unmarshal(r io.Reader) error { - _, err := io.ReadFull(r, c) - return err -} - // UnmarshalFromBuf implements chunk. func (c varbitChunk) UnmarshalFromBuf(buf []byte) error { - if copied := copy(c, buf); copied != cap(c) { - return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied) + if copied := copy(c, buf); copied != cap(c) && copied != c.MarshalLen() { + return fmt.Errorf("incorrect byte count copied from buffer during unmarshaling, want %d or %d, got %d", c.MarshalLen(), ChunkLen, copied) } return nil } @@ -319,6 +314,19 @@ func (c varbitChunk) Utilization() float64 { return math.Min(float64(c.nextSampleOffset()/8+15)/float64(cap(c)), 1) } +// MarshalLen implements chunk. +func (c varbitChunk) MarshalLen() int { + bits := c.nextSampleOffset() + if bits < varbitThirdSampleBitOffset { + bits = varbitThirdSampleBitOffset + } + bytes := int(bits)/8 + 1 + if bytes > len(c) { + bytes = len(c) + } + return bytes +} + // Len implements chunk. Runs in O(n). func (c varbitChunk) Len() int { it := c.NewIterator() From a2a5a3fcac458d3038b8ce8aefbea75ebc581101 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 29 Nov 2018 15:25:41 +0000 Subject: [PATCH 2/3] Add an option to control whether varbit chunks are saved full-size This allows all components to be rolled out in a mode which accepts either size of chunk, then changed over to write the new way at a later date. Signed-off-by: Bryan Boreham --- cmd/ingester/main.go | 4 +++- pkg/chunk/encoding/chunk_test.go | 1 + pkg/chunk/encoding/varbit.go | 14 ++++++++++++++ 3 files changed, 18 insertions(+), 1 deletion(-) diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 0e6581f49df..47c4d0a094c 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -47,6 +48,7 @@ func main() { ingesterConfig ingester.Config preallocConfig client.PreallocConfig clientConfig client.Config + marshalConfig encoding.MarshalConfig limits validation.Limits eventSampleRate int maxStreams uint @@ -60,7 +62,7 @@ func main() { ingesterConfig.LifecyclerConfig.ListenPort = &serverConfig.GRPCListenPort util.RegisterFlags(&serverConfig, &chunkStoreConfig, &storageConfig, - &schemaConfig, &ingesterConfig, &clientConfig, &limits, &preallocConfig) + &schemaConfig, &ingesterConfig, &clientConfig, &limits, &preallocConfig, &marshalConfig) flag.UintVar(&maxStreams, "ingester.max-concurrent-streams", 1000, "Limit on the number of concurrent streams for gRPC calls (0 = unlimited)") flag.IntVar(&eventSampleRate, "event.sample-rate", 0, "How often to sample observability events (0 = never).") flag.Parse() diff --git a/pkg/chunk/encoding/chunk_test.go b/pkg/chunk/encoding/chunk_test.go index 6266c99815b..1256b3e8858 100644 --- a/pkg/chunk/encoding/chunk_test.go +++ b/pkg/chunk/encoding/chunk_test.go @@ -55,6 +55,7 @@ func TestLen(t *testing.T) { var step = int(15 * time.Second / time.Millisecond) func TestChunk(t *testing.T) { + alwaysMarshalFullsizeChunks = false for _, tc := range []struct { encoding Encoding maxSamples int diff --git a/pkg/chunk/encoding/varbit.go b/pkg/chunk/encoding/varbit.go index c7ed63f1c3a..387eea9d8ee 100644 --- a/pkg/chunk/encoding/varbit.go +++ b/pkg/chunk/encoding/varbit.go @@ -18,6 +18,7 @@ package encoding import ( "encoding/binary" + "flag" "fmt" "io" "math" @@ -314,8 +315,21 @@ func (c varbitChunk) Utilization() float64 { return math.Min(float64(c.nextSampleOffset()/8+15)/float64(cap(c)), 1) } +// MarshalConfig configures the behaviour of marshalling +type MarshalConfig struct{} + +var alwaysMarshalFullsizeChunks = true + +// RegisterFlags registers configuration settings. +func (MarshalConfig) RegisterFlags(f *flag.FlagSet) { + flag.BoolVar(&alwaysMarshalFullsizeChunks, "store.fullsize-chunks", alwaysMarshalFullsizeChunks, "When saving varbit chunks, pad to 1024 bytes") +} + // MarshalLen implements chunk. func (c varbitChunk) MarshalLen() int { + if alwaysMarshalFullsizeChunks { + return cap(c) + } bits := c.nextSampleOffset() if bits < varbitThirdSampleBitOffset { bits = varbitThirdSampleBitOffset From 5f8de0c71507f6360be2e8d76ef89daff49b8b1e Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Thu, 29 Nov 2018 15:54:57 +0000 Subject: [PATCH 3/3] Make varbitChunk.Size() more accurate Also un-export MarshalLen since it is not called from outside. Signed-off-by: Bryan Boreham --- pkg/chunk/encoding/varbit.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/chunk/encoding/varbit.go b/pkg/chunk/encoding/varbit.go index 387eea9d8ee..01a2ed74bb9 100644 --- a/pkg/chunk/encoding/varbit.go +++ b/pkg/chunk/encoding/varbit.go @@ -287,7 +287,7 @@ func (c *varbitChunk) Slice(_, _ model.Time) Chunk { // Marshal implements chunk. func (c varbitChunk) Marshal(w io.Writer) error { - size := c.MarshalLen() + size := c.marshalLen() n, err := w.Write(c[:size]) if err != nil { return err @@ -300,8 +300,8 @@ func (c varbitChunk) Marshal(w io.Writer) error { // UnmarshalFromBuf implements chunk. func (c varbitChunk) UnmarshalFromBuf(buf []byte) error { - if copied := copy(c, buf); copied != cap(c) && copied != c.MarshalLen() { - return fmt.Errorf("incorrect byte count copied from buffer during unmarshaling, want %d or %d, got %d", c.MarshalLen(), ChunkLen, copied) + if copied := copy(c, buf); copied != cap(c) && copied != c.marshalLen() { + return fmt.Errorf("incorrect byte count copied from buffer during unmarshaling, want %d or %d, got %d", c.marshalLen(), ChunkLen, copied) } return nil } @@ -325,8 +325,8 @@ func (MarshalConfig) RegisterFlags(f *flag.FlagSet) { flag.BoolVar(&alwaysMarshalFullsizeChunks, "store.fullsize-chunks", alwaysMarshalFullsizeChunks, "When saving varbit chunks, pad to 1024 bytes") } -// MarshalLen implements chunk. -func (c varbitChunk) MarshalLen() int { +// marshalLen returns the number of bytes that should be marshalled for this chunk +func (c varbitChunk) marshalLen() int { if alwaysMarshalFullsizeChunks { return cap(c) } @@ -351,7 +351,7 @@ func (c varbitChunk) Len() int { } func (c varbitChunk) Size() int { - return len(c) + return c.marshalLen() } func (c varbitChunk) firstTime() model.Time {