From c7fafee4110c7f04b54d4b4a897e0b0457f90747 Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Wed, 7 Aug 2024 16:00:45 +0200 Subject: [PATCH 1/4] [Go][Parquet] Recover from panic in file reader --- go/parquet/pqarrow/file_reader.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go index a2e84d9ce27..d357fb24033 100755 --- a/go/parquet/pqarrow/file_reader.go +++ b/go/parquet/pqarrow/file_reader.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow/go/v18/arrow/array" "github.com/apache/arrow/go/v18/arrow/arrio" "github.com/apache/arrow/go/v18/arrow/memory" + "github.com/apache/arrow/go/v18/internal/utils" "github.com/apache/arrow/go/v18/parquet" "github.com/apache/arrow/go/v18/parquet/file" "github.com/apache/arrow/go/v18/parquet/schema" @@ -331,6 +332,12 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in wg.Add(np) // fan-out to np readers for i := 0; i < np; i++ { go func() { + defer func() { + if pErr := recover(); pErr != nil { + err := utils.FormatRecoveredError("panic while reading", pErr) + results <- resultPair{err: err} + } + }() defer wg.Done() for { select { From 3d67d5f2dd18ca44e5667a5bbccd95dac5bfe86b Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Thu, 8 Aug 2024 14:22:57 +0200 Subject: [PATCH 2/4] Add unit test --- ci/scripts/go_test.sh | 1 + go/parquet/pqarrow/file_reader_test.go | 41 ++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/ci/scripts/go_test.sh b/ci/scripts/go_test.sh index bad2ffe6190..45ea513b1ec 100755 --- a/ci/scripts/go_test.sh +++ b/ci/scripts/go_test.sh @@ -78,6 +78,7 @@ go test $testargs -tags $TAGS,noasm ./... popd export PARQUET_TEST_DATA=${1}/cpp/submodules/parquet-testing/data +export PARQUET_TEST_BAD_DATA=${1}/cpp/submodules/parquet-testing/bad_data export ARROW_TEST_DATA=${1}/testing/data pushd ${source_dir}/parquet diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index fe5a4547a77..15ea4f83c0e 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "strings" "testing" @@ -373,3 +374,43 @@ func TestFileReaderColumnChunkBoundsErrors(t *testing.T) { assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields())) } } + +func TestReadParquetFile(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_BAD_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_BAD_DATA") + } + assert.DirExists(t, dir) + filename := path.Join(dir, "GH_43605.parquet") + ctx := context.TODO() + + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + + rdr, err := file.OpenParquetFile( + filename, + false, + file.WithReadProps(parquet.NewReaderProperties(mem)), + ) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + defer func() { + if err2 := rdr.Close(); err2 != nil { + t.Errorf("unexpected error: %v", err2) + } + }() + + arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ + Parallel: false, + BatchSize: 0, + }, mem) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + _, err = arrowRdr.ReadTable(ctx) + + if err == nil { + t.Errorf("expected error: %v", err) + } +} From 2bcbdace898e72f4124aa3c25679821c61fef76f Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Mon, 26 Aug 2024 10:31:41 +0200 Subject: [PATCH 3/4] Update parquet-testing submodule --- cpp/submodules/parquet-testing | 2 +- go/parquet/pqarrow/file_reader_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index cb7a9674142..50af3d8ce20 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit cb7a9674142c137367bf75a01b79c6e214a73199 +Subproject commit 50af3d8ce206990d81014b1862e5ce7380dc3e08 diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index 15ea4f83c0e..6e07eb7a89c 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -381,7 +381,7 @@ func TestReadParquetFile(t *testing.T) { t.Skip("no path supplied with PARQUET_TEST_BAD_DATA") } assert.DirExists(t, dir) - filename := path.Join(dir, "GH_43605.parquet") + filename := path.Join(dir, "ARROW-GH-43605.parquet") ctx := context.TODO() mem := memory.NewCheckedAllocator(memory.DefaultAllocator) From 07de915f9087de12cb308bfd631d9b8e4cd806aa Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Thu, 8 Aug 2024 15:20:28 +0200 Subject: [PATCH 4/4] [Go][Parquet] unpack32Avx2 returns 0 if n is equal to 0 --- .../internal/utils/bit_packing_avx2_amd64.go | 4 ++++ go/parquet/pqarrow/file_reader_test.go | 20 +++++++++++++++---- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/go/parquet/internal/utils/bit_packing_avx2_amd64.go b/go/parquet/internal/utils/bit_packing_avx2_amd64.go index 0455ccc505b..53c114ebf2e 100644 --- a/go/parquet/internal/utils/bit_packing_avx2_amd64.go +++ b/go/parquet/internal/utils/bit_packing_avx2_amd64.go @@ -39,6 +39,10 @@ func unpack32Avx2(in io.Reader, out []uint32, nbits int) int { n := batch * nbits / 8 + if n == 0 { + return 0 + } + buffer := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buffer) buffer.Reset() diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index 6e07eb7a89c..815a4ab4905 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -401,16 +401,28 @@ func TestReadParquetFile(t *testing.T) { }() arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ - Parallel: false, + Parallel: true, BatchSize: 0, }, mem) if err != nil { t.Errorf("unexpected error: %v", err) } - _, err = arrowRdr.ReadTable(ctx) + table, err := arrowRdr.ReadTable(ctx) + defer table.Release() - if err == nil { - t.Errorf("expected error: %v", err) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + assert.Equal(t, table.NumCols(), int64(1)) + assert.Equal(t, table.NumRows(), int64(21186)) + + for i := 0; i < int(table.NumCols()); i++ { + col := table.Column(i) + assert.Equal(t, col.Len(), int(table.NumRows())) + for j := 0; j < col.Len(); j++ { + assert.Equal(t, col.Data().Chunk(0).(*array.Uint16).Value(j), uint16(0)) + } } }