diff --git a/util/chunk/chunk.go b/util/chunk/chunk.go index 66e88d94fddea..60bc217ac74e5 100644 --- a/util/chunk/chunk.go +++ b/util/chunk/chunk.go @@ -66,12 +66,7 @@ func New(fields []*types.FieldType, cap, maxChunkSize int) *Chunk { } for _, f := range fields { - elemLen := getFixedLen(f) - if elemLen == varElemLen { - chk.columns = append(chk.columns, newVarLenColumn(chk.capacity, nil)) - } else { - chk.columns = append(chk.columns, newFixedLenColumn(elemLen, chk.capacity)) - } + chk.columns = append(chk.columns, NewColumn(f, chk.capacity)) } return chk @@ -310,7 +305,7 @@ func (c *Chunk) AppendRow(row Row) { func (c *Chunk) AppendPartialRow(colIdx int, row Row) { for i, rowCol := range row.c.columns { chkCol := c.columns[colIdx+i] - chkCol.appendNullBitmap(!rowCol.isNull(row.idx)) + chkCol.appendNullBitmap(!rowCol.IsNull(row.idx)) if rowCol.isFixed() { elemLen := len(rowCol.elemBuf) offset := row.idx * elemLen @@ -338,7 +333,7 @@ func (c *Chunk) PreAlloc(row Row) (rowIdx uint32) { rowIdx = uint32(c.NumRows()) for i, srcCol := range row.c.columns { dstCol := c.columns[i] - dstCol.appendNullBitmap(!srcCol.isNull(row.idx)) + dstCol.appendNullBitmap(!srcCol.IsNull(row.idx)) elemLen := len(srcCol.elemBuf) if !srcCol.isFixed() { elemLen = int(srcCol.offsets[row.idx+1] - srcCol.offsets[row.idx]) @@ -421,7 +416,7 @@ func (c *Chunk) Append(other *Chunk, begin, end int) { } } for i := begin; i < end; i++ { - dst.appendNullBitmap(!src.isNull(i)) + dst.appendNullBitmap(!src.IsNull(i)) dst.length++ } } @@ -439,7 +434,7 @@ func (c *Chunk) TruncateTo(numRows int) { col.offsets = col.offsets[:numRows+1] } for i := numRows; i < col.length; i++ { - if col.isNull(i) { + if col.IsNull(i) { col.nullCount-- } } @@ -476,7 +471,7 @@ func (c *Chunk) AppendUint64(colIdx int, u uint64) { // AppendFloat32 appends a float32 value to the chunk. func (c *Chunk) AppendFloat32(colIdx int, f float32) { - c.columns[colIdx].appendFloat32(f) + c.columns[colIdx].AppendFloat32(f) } // AppendFloat64 appends a float64 value to the chunk. @@ -555,6 +550,11 @@ func (c *Chunk) AppendDatum(colIdx int, d *types.Datum) { } } +// Column returns the specific column. +func (c *Chunk) Column(colIdx int) *Column { + return c.columns[colIdx] +} + func writeTime(buf []byte, t types.Time) { binary.BigEndian.PutUint16(buf, uint16(t.Time.Year())) buf[2] = uint8(t.Time.Month()) diff --git a/util/chunk/chunk_util.go b/util/chunk/chunk_util.go index 9bc6dddb73fda..be15dafe44a87 100644 --- a/util/chunk/chunk_util.go +++ b/util/chunk/chunk_util.go @@ -47,7 +47,7 @@ func copySelectedInnerRows(innerColOffset, outerColOffset int, src *Chunk, selec if !selected[i] { continue } - dstCol.appendNullBitmap(!srcCol.isNull(i)) + dstCol.appendNullBitmap(!srcCol.IsNull(i)) dstCol.length++ elemLen := len(srcCol.elemBuf) @@ -59,7 +59,7 @@ func copySelectedInnerRows(innerColOffset, outerColOffset int, src *Chunk, selec if !selected[i] { continue } - dstCol.appendNullBitmap(!srcCol.isNull(i)) + dstCol.appendNullBitmap(!srcCol.IsNull(i)) dstCol.length++ start, end := srcCol.offsets[i], srcCol.offsets[i+1] @@ -86,7 +86,7 @@ func copyOuterRows(innerColOffset, outerColOffset int, src *Chunk, numRows int, } for i, srcCol := range srcCols { dstCol := dst.columns[outerColOffset+i] - dstCol.appendMultiSameNullBitmap(!srcCol.isNull(row.idx), numRows) + dstCol.appendMultiSameNullBitmap(!srcCol.IsNull(row.idx), numRows) dstCol.length += numRows if srcCol.isFixed() { elemLen := len(srcCol.elemBuf) diff --git a/util/chunk/column.go b/util/chunk/column.go index d1ff51f803200..d06b1f2e92669 100644 --- a/util/chunk/column.go +++ b/util/chunk/column.go @@ -14,10 +14,13 @@ package chunk import ( + "reflect" + "time" "unsafe" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" + "github.com/pingcap/tidb/util/hack" ) // AppendDuration appends a duration value into this Column. @@ -46,6 +49,11 @@ func (c *Column) AppendJSON(j json.BinaryJSON) { c.finishAppendVar() } +// AppendSet appends a Set value into this Column. +func (c *Column) AppendSet(set types.Set) { + c.appendNameValue(set.Name, set.Value) +} + // Column stores one column of data in Apache Arrow format. // See https://arrow.apache.org/docs/memory_layout.html type Column struct { @@ -57,6 +65,15 @@ type Column struct { elemBuf []byte } +// NewColumn creates a new column with the specific length and capacity. +func NewColumn(ft *types.FieldType, cap int) *Column { + typeSize := getFixedLen(ft) + if typeSize == varElemLen { + return newVarLenColumn(cap, nil) + } + return newFixedLenColumn(typeSize, cap) +} + func (c *Column) isFixed() bool { return c.elemBuf != nil } @@ -73,7 +90,8 @@ func (c *Column) Reset() { c.data = c.data[:0] } -func (c *Column) isNull(rowIdx int) bool { +// IsNull returns if this row is null. +func (c *Column) IsNull(rowIdx int) bool { nullByte := c.nullBitmap[rowIdx/8] return nullByte&(1<<(uint(rowIdx)&7)) == 0 } @@ -155,8 +173,8 @@ func (c *Column) AppendUint64(u uint64) { c.finishAppendFixed() } -// appendFloat32 appends a float32 value into this Column. -func (c *Column) appendFloat32(f float32) { +// AppendFloat32 appends a float32 value into this Column. +func (c *Column) AppendFloat32(f float32) { *(*float32)(unsafe.Pointer(&c.elemBuf[0])) = f c.finishAppendFixed() } @@ -190,3 +208,104 @@ func (c *Column) AppendTime(t types.Time) { writeTime(c.elemBuf, t) c.finishAppendFixed() } + +// AppendEnum appends a Enum value into this Column. +func (c *Column) AppendEnum(enum types.Enum) { + c.appendNameValue(enum.Name, enum.Value) +} + +const ( + sizeInt64 = int(unsafe.Sizeof(int64(0))) + sizeUint64 = int(unsafe.Sizeof(uint64(0))) + sizeFloat32 = int(unsafe.Sizeof(float32(0))) + sizeFloat64 = int(unsafe.Sizeof(float64(0))) + sizeMyDecimal = int(unsafe.Sizeof(types.MyDecimal{})) +) + +func (c *Column) castSliceHeader(header *reflect.SliceHeader, typeSize int) { + header.Data = uintptr(unsafe.Pointer(&c.data[0])) + header.Len = c.length + header.Cap = cap(c.data) / typeSize +} + +// Int64s returns an int64 slice stored in this Column. +func (c *Column) Int64s() []int64 { + var res []int64 + c.castSliceHeader((*reflect.SliceHeader)(unsafe.Pointer(&res)), sizeInt64) + return res +} + +// Uint64s returns a uint64 slice stored in this Column. +func (c *Column) Uint64s() []uint64 { + var res []uint64 + c.castSliceHeader((*reflect.SliceHeader)(unsafe.Pointer(&res)), sizeUint64) + return res +} + +// Float32s returns a float32 slice stored in this Column. +func (c *Column) Float32s() []float32 { + var res []float32 + c.castSliceHeader((*reflect.SliceHeader)(unsafe.Pointer(&res)), sizeFloat32) + return res +} + +// Float64s returns a float64 slice stored in this Column. +func (c *Column) Float64s() []float64 { + var res []float64 + c.castSliceHeader((*reflect.SliceHeader)(unsafe.Pointer(&res)), sizeFloat64) + return res +} + +// Decimals returns a MyDecimal slice stored in this Column. +func (c *Column) Decimals() []types.MyDecimal { + var res []types.MyDecimal + c.castSliceHeader((*reflect.SliceHeader)(unsafe.Pointer(&res)), sizeMyDecimal) + return res +} + +// GetString returns the string in the specific row. +func (c *Column) GetString(rowID int) string { + return string(hack.String(c.data[c.offsets[rowID]:c.offsets[rowID+1]])) +} + +// GetJSON returns the JSON in the specific row. +func (c *Column) GetJSON(rowID int) json.BinaryJSON { + start := c.offsets[rowID] + return json.BinaryJSON{TypeCode: c.data[start], Value: c.data[start+1 : c.offsets[rowID+1]]} +} + +// GetBytes returns the byte slice in the specific row. +func (c *Column) GetBytes(rowID int) []byte { + return c.data[c.offsets[rowID]:c.offsets[rowID+1]] +} + +// GetEnum returns the Enum in the specific row. +func (c *Column) GetEnum(rowID int) types.Enum { + name, val := c.getNameValue(rowID) + return types.Enum{Name: name, Value: val} +} + +// GetSet returns the Set in the specific row. +func (c *Column) GetSet(rowID int) types.Set { + name, val := c.getNameValue(rowID) + return types.Set{Name: name, Value: val} +} + +// GetTime returns the Time in the specific row. +func (c *Column) GetTime(rowID int) types.Time { + return readTime(c.data[rowID*16:]) +} + +// GetDuration returns the Duration in the specific row. +func (c *Column) GetDuration(rowID int, fillFsp int) types.Duration { + dur := *(*int64)(unsafe.Pointer(&c.data[rowID*8])) + return types.Duration{Duration: time.Duration(dur), Fsp: fillFsp} +} + +func (c *Column) getNameValue(rowID int) (string, uint64) { + start, end := c.offsets[rowID], c.offsets[rowID+1] + if start == end { + return "", 0 + } + return string(hack.String(c.data[start+8 : end])), *(*uint64)(unsafe.Pointer(&c.data[start])) +} diff --git a/util/chunk/column_test.go b/util/chunk/column_test.go index ff577fea30ac4..210e270155b84 100644 --- a/util/chunk/column_test.go +++ b/util/chunk/column_test.go @@ -14,7 +14,13 @@ package chunk import ( + "fmt" + "time" + "github.com/pingcap/check" + "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/types/json" ) func equalColumn(c1, c2 *Column) bool { @@ -67,3 +73,251 @@ func (s *testChunkSuite) TestLargeStringColumnOffset(c *check.C) { col.offsets[0] = 6 << 30 c.Check(col.offsets[0], check.Equals, int64(6<<30)) // test no overflow. } + +func (s *testChunkSuite) TestI64Column(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendInt64(int64(i)) + } + + i64s := col.Int64s() + for i := 0; i < 1024; i++ { + c.Assert(i64s[i], check.Equals, int64(i)) + i64s[i]++ + } + + it := NewIterator4Chunk(chk) + var i int64 + for row := it.Begin(); row != it.End(); row = it.Next() { + c.Assert(row.GetInt64(0), check.Equals, int64(i+1)) + i++ + } +} + +func (s *testChunkSuite) TestF64Column(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeDouble)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendFloat64(float64(i)) + } + + f64s := col.Float64s() + for i := 0; i < 1024; i++ { + c.Assert(f64s[i], check.Equals, float64(i)) + f64s[i] /= 2 + } + + it := NewIterator4Chunk(chk) + var i int64 + for row := it.Begin(); row != it.End(); row = it.Next() { + c.Assert(row.GetFloat64(0), check.Equals, float64(i)/2) + i++ + } +} + +func (s *testChunkSuite) TestF32Column(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeFloat)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendFloat32(float32(i)) + } + + f32s := col.Float32s() + for i := 0; i < 1024; i++ { + c.Assert(f32s[i], check.Equals, float32(i)) + f32s[i] /= 2 + } + + it := NewIterator4Chunk(chk) + var i int64 + for row := it.Begin(); row != it.End(); row = it.Next() { + c.Assert(row.GetFloat32(0), check.Equals, float32(i)/2) + i++ + } +} + +func (s *testChunkSuite) TestMyDecimal(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeNewDecimal)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + d := new(types.MyDecimal) + if err := d.FromFloat64(float64(i) * 1.1); err != nil { + c.Fatal(err) + } + col.AppendMyDecimal(d) + } + + ds := col.Decimals() + for i := 0; i < 1024; i++ { + d := new(types.MyDecimal) + if err := d.FromFloat64(float64(i) * 1.1); err != nil { + c.Fatal(err) + } + c.Assert(d.Compare(&ds[i]), check.Equals, 0) + + if err := types.DecimalAdd(&ds[i], d, &ds[i]); err != nil { + c.Fatal(err) + } + } + + it := NewIterator4Chunk(chk) + var i int64 + for row := it.Begin(); row != it.End(); row = it.Next() { + d := new(types.MyDecimal) + if err := d.FromFloat64(float64(i) * 1.1 * 2); err != nil { + c.Fatal(err) + } + + delta := new(types.MyDecimal) + if err := types.DecimalSub(d, row.GetMyDecimal(0), delta); err != nil { + c.Fatal(err) + } + + fDelta, err := delta.ToFloat64() + if err != nil { + c.Fatal(err) + } + if fDelta > 0.0001 || fDelta < -0.0001 { + c.Fatal() + } + + i++ + } +} + +func (s *testChunkSuite) TestStringColumn(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeVarString)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendString(fmt.Sprintf("%v", i*i)) + } + + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + c.Assert(row.GetString(0), check.Equals, fmt.Sprintf("%v", i*i)) + c.Assert(col.GetString(i), check.Equals, fmt.Sprintf("%v", i*i)) + i++ + } +} + +func (s *testChunkSuite) TestSetColumn(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeSet)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendSet(types.Set{Name: fmt.Sprintf("%v", i), Value: uint64(i)}) + } + + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + s1 := col.GetSet(i) + s2 := row.GetSet(0) + c.Assert(s1.Name, check.Equals, s2.Name) + c.Assert(s1.Value, check.Equals, s2.Value) + c.Assert(s1.Name, check.Equals, fmt.Sprintf("%v", i)) + c.Assert(s1.Value, check.Equals, uint64(i)) + i++ + } +} + +func (s *testChunkSuite) TestJSONColumn(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeJSON)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + j := new(json.BinaryJSON) + if err := j.UnmarshalJSON([]byte(fmt.Sprintf(`{"%v":%v}`, i, i))); err != nil { + c.Fatal(err) + } + col.AppendJSON(*j) + } + + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + j1 := col.GetJSON(i) + j2 := row.GetJSON(0) + c.Assert(j1.String(), check.Equals, j2.String()) + i++ + } +} + +func (s *testChunkSuite) TestTimeColumn(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeDatetime)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendTime(types.CurrentTime(mysql.TypeDatetime)) + time.Sleep(time.Millisecond / 10) + } + + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + j1 := col.GetTime(i) + j2 := row.GetTime(0) + c.Assert(j1.Compare(j2), check.Equals, 0) + i++ + } +} + +func (s *testChunkSuite) TestDurationColumn(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeDuration)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendDuration(types.Duration{Duration: time.Second * time.Duration(i)}) + } + + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + j1 := col.GetDuration(i, 0) + j2 := row.GetDuration(0, 0) + c.Assert(j1.Compare(j2), check.Equals, 0) + i++ + } +} + +func (s *testChunkSuite) TestEnumColumn(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeEnum)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + col.AppendEnum(types.Enum{Name: fmt.Sprintf("%v", i), Value: uint64(i)}) + } + + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + s1 := col.GetEnum(i) + s2 := row.GetEnum(0) + c.Assert(s1.Name, check.Equals, s2.Name) + c.Assert(s1.Value, check.Equals, s2.Value) + c.Assert(s1.Name, check.Equals, fmt.Sprintf("%v", i)) + c.Assert(s1.Value, check.Equals, uint64(i)) + i++ + } +} + +func (s *testChunkSuite) TestNullsColumn(c *check.C) { + chk := NewChunkWithCapacity([]*types.FieldType{types.NewFieldType(mysql.TypeLonglong)}, 1024) + col := chk.Column(0) + for i := 0; i < 1024; i++ { + if i%2 == 0 { + col.AppendNull() + continue + } + col.AppendInt64(int64(i)) + } + + it := NewIterator4Chunk(chk) + var i int + for row := it.Begin(); row != it.End(); row = it.Next() { + if i%2 == 0 { + c.Assert(row.IsNull(0), check.Equals, true) + c.Assert(col.IsNull(i), check.Equals, true) + } else { + c.Assert(row.GetInt64(0), check.Equals, int64(i)) + } + i++ + } +} diff --git a/util/chunk/mutrow.go b/util/chunk/mutrow.go index aa36326559930..b0bddc18f3029 100644 --- a/util/chunk/mutrow.go +++ b/util/chunk/mutrow.go @@ -203,7 +203,7 @@ func makeMutRowBytesColumn(bin []byte) *Column { func (mr MutRow) SetRow(row Row) { for colIdx, rCol := range row.c.columns { mrCol := mr.c.columns[colIdx] - if rCol.isNull(row.idx) { + if rCol.IsNull(row.idx) { mrCol.nullBitmap[0] = 0 continue } @@ -351,7 +351,7 @@ func setMutRowJSON(col *Column, j json.BinaryJSON) { func (mr MutRow) ShallowCopyPartialRow(colIdx int, row Row) { for i, srcCol := range row.c.columns { dstCol := mr.c.columns[colIdx+i] - if !srcCol.isNull(row.idx) { + if !srcCol.IsNull(row.idx) { // MutRow only contains one row, so we can directly set the whole byte. dstCol.nullBitmap[0] = 1 } else { diff --git a/util/chunk/row.go b/util/chunk/row.go index 0d282558fc0e1..b4c22d6b02210 100644 --- a/util/chunk/row.go +++ b/util/chunk/row.go @@ -220,5 +220,5 @@ func (r Row) GetDatum(colIdx int, tp *types.FieldType) types.Datum { // IsNull returns if the datum in the chunk.Row is null. func (r Row) IsNull(colIdx int) bool { - return r.c.columns[colIdx].isNull(r.idx) + return r.c.columns[colIdx].IsNull(r.idx) }