From b8a24d611c370e21f87ae95afffe235a4b05e336 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Wed, 7 May 2025 17:37:42 +0200 Subject: [PATCH 1/5] add TestDeltaByteArray test This exact test seems to be passing on v17 version. More investigation is needed --- parquet/file/file_reader_test.go | 51 ++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go index a850fab7..8706235d 100644 --- a/parquet/file/file_reader_test.go +++ b/parquet/file/file_reader_test.go @@ -21,6 +21,7 @@ import ( "context" "crypto/rand" "encoding/binary" + "encoding/csv" "fmt" "io" "os" @@ -820,3 +821,53 @@ func TestLZ4RawLargerFileRead(t *testing.T) { } require.Equal(t, expectedValsHead, vals[:len(expectedValsHead)]) } + +func TestDeltaByteArray(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_DATA") + } + require.DirExists(t, dir) + + expected, err := os.ReadFile(path.Join(dir, "delta_byte_array_expect.csv")) + require.NoError(t, err) + csvReader := csv.NewReader(bytes.NewReader(expected)) + + records, err := csvReader.ReadAll() + require.NoError(t, err) + + records = records[1:] // skip header + + props := parquet.NewReaderProperties(memory.DefaultAllocator) + fileReader, err := file.OpenParquetFile(path.Join(dir, "delta_byte_array.parquet"), + false, file.WithReadProps(props)) + require.NoError(t, err) + defer fileReader.Close() + + arrowReader, err := pqarrow.NewFileReader( + fileReader, + pqarrow.ArrowReadProperties{BatchSize: 1024}, + memory.DefaultAllocator, + ) + require.NoError(t, err) + + rr, err := arrowReader.GetRecordReader(context.Background(), nil, nil) + require.NoError(t, err) + defer rr.Release() + + for rr.Next() { + rec := rr.Record() + defer rec.Release() + + for i := 0; i < int(rec.NumCols()); i++ { + vals := rec.Column(i) + for j := 0; j < vals.Len(); j++ { + if vals.IsNull(j) { + require.Equal(t, records[j][i], "") + continue + } + require.Equal(t, records[j][i], vals.ValueStr(j)) + } + } + } +} From 8923842efd043751e3ba7c841241f70568162d17 Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Fri, 9 May 2025 10:52:54 +0200 Subject: [PATCH 2/5] fix: use totalValues instead of nvals I'm unsure about what the difference is, but by looking at version v17 code, I noticed a difference between Int32 and Int64 decoder. Int32 used totalValues and Int64 used nvals. Looks like when merging implementations, the Int64 approach was used. Which seems to not be working properly for Int32. This change at least seems to be able to read the test file, which makes me think it is the right approach. --- parquet/internal/encoding/delta_bit_packing.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/internal/encoding/delta_bit_packing.go b/parquet/internal/encoding/delta_bit_packing.go index a57b12c1..44b40b55 100644 --- a/parquet/internal/encoding/delta_bit_packing.go +++ b/parquet/internal/encoding/delta_bit_packing.go @@ -197,7 +197,7 @@ func (d *deltaBitPackDecoder[T]) Discard(n int) (int, error) { // Decode retrieves min(remaining values, len(out)) values from the data and returns the number // of values actually decoded and any errors encountered. func (d *deltaBitPackDecoder[T]) Decode(out []T) (int, error) { - max := shared_utils.Min(len(out), int(d.nvals)) + max := shared_utils.Min(len(out), int(d.totalValues)) if max == 0 { return 0, nil } From 0ddec7b5227aa1fede5784e1f73bce0fb833951a Mon Sep 17 00:00:00 2001 From: Victor Perez Date: Fri, 9 May 2025 11:04:53 +0200 Subject: [PATCH 3/5] Ensure CSV has the same number of rows as the Parquet file --- parquet/file/file_reader_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go index 8706235d..de214847 100644 --- a/parquet/file/file_reader_test.go +++ b/parquet/file/file_reader_test.go @@ -844,6 +844,9 @@ func TestDeltaByteArray(t *testing.T) { require.NoError(t, err) defer fileReader.Close() + nrows := fileReader.MetaData().NumRows + assert.Equal(t, nrows, int64(len(records)), "expected %d rows, got %d", len(records), nrows) + arrowReader, err := pqarrow.NewFileReader( fileReader, pqarrow.ArrowReadProperties{BatchSize: 1024}, From a7dd7322840b3f7d51500d164af0cbb30b109c6b Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 9 May 2025 16:35:28 -0400 Subject: [PATCH 4/5] better fix, without the test failures --- parquet/internal/encoding/delta_bit_packing.go | 3 ++- parquet/internal/encoding/delta_byte_array.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet/internal/encoding/delta_bit_packing.go b/parquet/internal/encoding/delta_bit_packing.go index 44b40b55..9f43fc88 100644 --- a/parquet/internal/encoding/delta_bit_packing.go +++ b/parquet/internal/encoding/delta_bit_packing.go @@ -101,6 +101,7 @@ func (d *deltaBitPackDecoder[T]) SetData(nvalues int, data []byte) error { d.valsPerMini = uint32(d.blockSize / d.miniBlocksPerBlock) d.usedFirst = false + d.nvals = int(d.totalValues) return nil } @@ -197,7 +198,7 @@ func (d *deltaBitPackDecoder[T]) Discard(n int) (int, error) { // Decode retrieves min(remaining values, len(out)) values from the data and returns the number // of values actually decoded and any errors encountered. func (d *deltaBitPackDecoder[T]) Decode(out []T) (int, error) { - max := shared_utils.Min(len(out), int(d.totalValues)) + max := shared_utils.Min(len(out), int(d.nvals)) if max == 0 { return 0, nil } diff --git a/parquet/internal/encoding/delta_byte_array.go b/parquet/internal/encoding/delta_byte_array.go index bb2134a8..580c83d3 100644 --- a/parquet/internal/encoding/delta_byte_array.go +++ b/parquet/internal/encoding/delta_byte_array.go @@ -171,7 +171,7 @@ func (d *DeltaByteArrayDecoder) SetData(nvalues int, data []byte) error { return err } - d.prefixLengths = make([]int32, nvalues) + d.prefixLengths = make([]int32, prefixLenDec.ValuesLeft()) // decode all the prefix lengths first so we know how many bytes it took to get the // prefix lengths for nvalues prefixLenDec.Decode(d.prefixLengths) From ffd5d4c1f1c62663bb0426119aff733e0562c650 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 9 May 2025 17:26:37 -0400 Subject: [PATCH 5/5] unnecessary release --- parquet/file/file_reader_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go index de214847..d15b4df9 100644 --- a/parquet/file/file_reader_test.go +++ b/parquet/file/file_reader_test.go @@ -860,11 +860,9 @@ func TestDeltaByteArray(t *testing.T) { for rr.Next() { rec := rr.Record() - defer rec.Release() - - for i := 0; i < int(rec.NumCols()); i++ { + for i := range int(rec.NumCols()) { vals := rec.Column(i) - for j := 0; j < vals.Len(); j++ { + for j := range vals.Len() { if vals.IsNull(j) { require.Equal(t, records[j][i], "") continue