diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go index a850fab7..d15b4df9 100644 --- a/parquet/file/file_reader_test.go +++ b/parquet/file/file_reader_test.go @@ -21,6 +21,7 @@ import ( "context" "crypto/rand" "encoding/binary" + "encoding/csv" "fmt" "io" "os" @@ -820,3 +821,54 @@ func TestLZ4RawLargerFileRead(t *testing.T) { } require.Equal(t, expectedValsHead, vals[:len(expectedValsHead)]) } + +func TestDeltaByteArray(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_DATA") + } + require.DirExists(t, dir) + + expected, err := os.ReadFile(path.Join(dir, "delta_byte_array_expect.csv")) + require.NoError(t, err) + csvReader := csv.NewReader(bytes.NewReader(expected)) + + records, err := csvReader.ReadAll() + require.NoError(t, err) + + records = records[1:] // skip header + + props := parquet.NewReaderProperties(memory.DefaultAllocator) + fileReader, err := file.OpenParquetFile(path.Join(dir, "delta_byte_array.parquet"), + false, file.WithReadProps(props)) + require.NoError(t, err) + defer fileReader.Close() + + nrows := fileReader.MetaData().NumRows + assert.Equal(t, nrows, int64(len(records)), "expected %d rows, got %d", len(records), nrows) + + arrowReader, err := pqarrow.NewFileReader( + fileReader, + pqarrow.ArrowReadProperties{BatchSize: 1024}, + memory.DefaultAllocator, + ) + require.NoError(t, err) + + rr, err := arrowReader.GetRecordReader(context.Background(), nil, nil) + require.NoError(t, err) + defer rr.Release() + + for rr.Next() { + rec := rr.Record() + for i := range int(rec.NumCols()) { + vals := rec.Column(i) + for j := range vals.Len() { + if vals.IsNull(j) { + require.Equal(t, records[j][i], "") + continue + } + require.Equal(t, records[j][i], vals.ValueStr(j)) + } + } + } +} diff --git a/parquet/internal/encoding/delta_bit_packing.go b/parquet/internal/encoding/delta_bit_packing.go index a57b12c1..9f43fc88 100644 --- a/parquet/internal/encoding/delta_bit_packing.go +++ b/parquet/internal/encoding/delta_bit_packing.go @@ -101,6 +101,7 @@ func (d *deltaBitPackDecoder[T]) SetData(nvalues int, data []byte) error { d.valsPerMini = uint32(d.blockSize / d.miniBlocksPerBlock) d.usedFirst = false + d.nvals = int(d.totalValues) return nil } diff --git a/parquet/internal/encoding/delta_byte_array.go b/parquet/internal/encoding/delta_byte_array.go index bb2134a8..580c83d3 100644 --- a/parquet/internal/encoding/delta_byte_array.go +++ b/parquet/internal/encoding/delta_byte_array.go @@ -171,7 +171,7 @@ func (d *DeltaByteArrayDecoder) SetData(nvalues int, data []byte) error { return err } - d.prefixLengths = make([]int32, nvalues) + d.prefixLengths = make([]int32, prefixLenDec.ValuesLeft()) // decode all the prefix lengths first so we know how many bytes it took to get the // prefix lengths for nvalues prefixLenDec.Decode(d.prefixLengths)