From 9f0d3177b8278fc311a931e7dffa9f82ea582d34 Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Wed, 7 Aug 2024 16:00:45 +0200 Subject: [PATCH 1/6] [Go][Parquet] Recover from panic in file reader --- go/parquet/pqarrow/file_reader.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go index a2e84d9ce27..d357fb24033 100755 --- a/go/parquet/pqarrow/file_reader.go +++ b/go/parquet/pqarrow/file_reader.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow/go/v18/arrow/array" "github.com/apache/arrow/go/v18/arrow/arrio" "github.com/apache/arrow/go/v18/arrow/memory" + "github.com/apache/arrow/go/v18/internal/utils" "github.com/apache/arrow/go/v18/parquet" "github.com/apache/arrow/go/v18/parquet/file" "github.com/apache/arrow/go/v18/parquet/schema" @@ -331,6 +332,12 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in wg.Add(np) // fan-out to np readers for i := 0; i < np; i++ { go func() { + defer func() { + if pErr := recover(); pErr != nil { + err := utils.FormatRecoveredError("panic while reading", pErr) + results <- resultPair{err: err} + } + }() defer wg.Done() for { select { From 72b71f79e206cc6bdde4efe44b3b63bf0a1d24bf Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Thu, 8 Aug 2024 14:22:57 +0200 Subject: [PATCH 2/6] Add unit test --- ci/scripts/go_test.sh | 1 + go/parquet/pqarrow/file_reader_test.go | 41 ++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/ci/scripts/go_test.sh b/ci/scripts/go_test.sh index bad2ffe6190..45ea513b1ec 100755 --- a/ci/scripts/go_test.sh +++ b/ci/scripts/go_test.sh @@ -78,6 +78,7 @@ go test $testargs -tags $TAGS,noasm ./... popd export PARQUET_TEST_DATA=${1}/cpp/submodules/parquet-testing/data +export PARQUET_TEST_BAD_DATA=${1}/cpp/submodules/parquet-testing/bad_data export ARROW_TEST_DATA=${1}/testing/data pushd ${source_dir}/parquet diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index fe5a4547a77..15ea4f83c0e 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "strings" "testing" @@ -373,3 +374,43 @@ func TestFileReaderColumnChunkBoundsErrors(t *testing.T) { assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields())) } } + +func TestReadParquetFile(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_BAD_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_BAD_DATA") + } + assert.DirExists(t, dir) + filename := path.Join(dir, "GH_43605.parquet") + ctx := context.TODO() + + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + + rdr, err := file.OpenParquetFile( + filename, + false, + file.WithReadProps(parquet.NewReaderProperties(mem)), + ) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + defer func() { + if err2 := rdr.Close(); err2 != nil { + t.Errorf("unexpected error: %v", err2) + } + }() + + arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ + Parallel: false, + BatchSize: 0, + }, mem) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + _, err = arrowRdr.ReadTable(ctx) + + if err == nil { + t.Errorf("expected error: %v", err) + } +} From 654c8393922aa71cb6a173af788084e83bd6a192 Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Mon, 26 Aug 2024 10:31:41 +0200 Subject: [PATCH 3/6] Update parquet-testing submodule --- cpp/submodules/parquet-testing | 2 +- go/parquet/pqarrow/file_reader_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index cb7a9674142..50af3d8ce20 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit cb7a9674142c137367bf75a01b79c6e214a73199 +Subproject commit 50af3d8ce206990d81014b1862e5ce7380dc3e08 diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index 15ea4f83c0e..6e07eb7a89c 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -381,7 +381,7 @@ func TestReadParquetFile(t *testing.T) { t.Skip("no path supplied with PARQUET_TEST_BAD_DATA") } assert.DirExists(t, dir) - filename := path.Join(dir, "GH_43605.parquet") + filename := path.Join(dir, "ARROW-GH-43605.parquet") ctx := context.TODO() mem := memory.NewCheckedAllocator(memory.DefaultAllocator) From 0cedaffb6ae333a31cdebdaa2b02fd5fe69437ea Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Thu, 29 Aug 2024 08:23:42 +0200 Subject: [PATCH 4/6] Simplify test conditions --- go/parquet/pqarrow/file_reader_test.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index 6e07eb7a89c..0e607589116 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -391,9 +391,7 @@ func TestReadParquetFile(t *testing.T) { false, file.WithReadProps(parquet.NewReaderProperties(mem)), ) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + require.NoError(t, err) defer func() { if err2 := rdr.Close(); err2 != nil { t.Errorf("unexpected error: %v", err2) @@ -404,13 +402,9 @@ func TestReadParquetFile(t *testing.T) { Parallel: false, BatchSize: 0, }, mem) - if err != nil { - t.Errorf("unexpected error: %v", err) - } + require.NoError(t, err) _, err = arrowRdr.ReadTable(ctx) - if err == nil { - t.Errorf("expected error: %v", err) - } + assert.Error(t, err) } From 589c3e4df00ca906c65830fa4e90ca182f424276 Mon Sep 17 00:00:00 2001 From: Anthony De Bortoli Date: Thu, 29 Aug 2024 09:01:02 +0200 Subject: [PATCH 5/6] Order defer functions properly Deferred function calls are executed in Last In First Out order after the surrounding function returns: https://go.dev/blog/defer-panic-and-recover --- go/parquet/pqarrow/file_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go index d357fb24033..d576b749d28 100755 --- a/go/parquet/pqarrow/file_reader.go +++ b/go/parquet/pqarrow/file_reader.go @@ -332,13 +332,13 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in wg.Add(np) // fan-out to np readers for i := 0; i < np; i++ { go func() { + defer wg.Done() defer func() { if pErr := recover(); pErr != nil { err := utils.FormatRecoveredError("panic while reading", pErr) results <- resultPair{err: err} } }() - defer wg.Done() for { select { case r, ok := <-ch: From 8edfb46a866f645e74a119185886c0d1c4759aba Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Thu, 12 Sep 2024 12:38:57 -0400 Subject: [PATCH 6/6] fix inconsistency when using noasm tag --- go/parquet/internal/utils/bit_packing_avx2_amd64.go | 5 ++--- go/parquet/internal/utils/bit_packing_neon_arm64.go | 5 ++--- go/parquet/pqarrow/file_reader_test.go | 3 +-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/go/parquet/internal/utils/bit_packing_avx2_amd64.go b/go/parquet/internal/utils/bit_packing_avx2_amd64.go index 0455ccc505b..5f1923fac2f 100644 --- a/go/parquet/internal/utils/bit_packing_avx2_amd64.go +++ b/go/parquet/internal/utils/bit_packing_avx2_amd64.go @@ -33,12 +33,11 @@ func _unpack32_avx2(in, out unsafe.Pointer, batchSize, nbits int) (num int) func unpack32Avx2(in io.Reader, out []uint32, nbits int) int { batch := len(out) / 32 * 32 - if batch <= 0 { + n := batch * nbits / 8 + if n <= 0 { return 0 } - n := batch * nbits / 8 - buffer := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buffer) buffer.Reset() diff --git a/go/parquet/internal/utils/bit_packing_neon_arm64.go b/go/parquet/internal/utils/bit_packing_neon_arm64.go index 09154e3e4b7..580f9a1f27e 100755 --- a/go/parquet/internal/utils/bit_packing_neon_arm64.go +++ b/go/parquet/internal/utils/bit_packing_neon_arm64.go @@ -33,12 +33,11 @@ func _unpack32_neon(in, out unsafe.Pointer, batchSize, nbits int) (num int) func unpack32NEON(in io.Reader, out []uint32, nbits int) int { batch := len(out) / 32 * 32 - if batch <= 0 { + n := batch * nbits / 8 + if n <= 0 { return 0 } - n := batch * nbits / 8 - buffer := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buffer) buffer.Reset() diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index 0e607589116..f2ad14859c7 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -405,6 +405,5 @@ func TestReadParquetFile(t *testing.T) { require.NoError(t, err) _, err = arrowRdr.ReadTable(ctx) - - assert.Error(t, err) + assert.NoError(t, err) }