diff --git a/ci/scripts/go_test.sh b/ci/scripts/go_test.sh index bad2ffe6190..45ea513b1ec 100755 --- a/ci/scripts/go_test.sh +++ b/ci/scripts/go_test.sh @@ -78,6 +78,7 @@ go test $testargs -tags $TAGS,noasm ./... popd export PARQUET_TEST_DATA=${1}/cpp/submodules/parquet-testing/data +export PARQUET_TEST_BAD_DATA=${1}/cpp/submodules/parquet-testing/bad_data export ARROW_TEST_DATA=${1}/testing/data pushd ${source_dir}/parquet diff --git a/cpp/submodules/parquet-testing b/cpp/submodules/parquet-testing index cb7a9674142..50af3d8ce20 160000 --- a/cpp/submodules/parquet-testing +++ b/cpp/submodules/parquet-testing @@ -1 +1 @@ -Subproject commit cb7a9674142c137367bf75a01b79c6e214a73199 +Subproject commit 50af3d8ce206990d81014b1862e5ce7380dc3e08 diff --git a/go/parquet/internal/utils/bit_packing_avx2_amd64.go b/go/parquet/internal/utils/bit_packing_avx2_amd64.go index 0455ccc505b..53c114ebf2e 100644 --- a/go/parquet/internal/utils/bit_packing_avx2_amd64.go +++ b/go/parquet/internal/utils/bit_packing_avx2_amd64.go @@ -39,6 +39,10 @@ func unpack32Avx2(in io.Reader, out []uint32, nbits int) int { n := batch * nbits / 8 + if n == 0 { + return 0 + } + buffer := bufferPool.Get().(*bytes.Buffer) defer bufferPool.Put(buffer) buffer.Reset() diff --git a/go/parquet/pqarrow/file_reader.go b/go/parquet/pqarrow/file_reader.go index a2e84d9ce27..d357fb24033 100755 --- a/go/parquet/pqarrow/file_reader.go +++ b/go/parquet/pqarrow/file_reader.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow/go/v18/arrow/array" "github.com/apache/arrow/go/v18/arrow/arrio" "github.com/apache/arrow/go/v18/arrow/memory" + "github.com/apache/arrow/go/v18/internal/utils" "github.com/apache/arrow/go/v18/parquet" "github.com/apache/arrow/go/v18/parquet/file" "github.com/apache/arrow/go/v18/parquet/schema" @@ -331,6 +332,12 @@ func (fr *FileReader) ReadRowGroups(ctx context.Context, indices, rowGroups []in wg.Add(np) // fan-out to np readers for i := 0; i < np; i++ { go func() { + defer func() { + if pErr := recover(); pErr != nil { + err := utils.FormatRecoveredError("panic while reading", pErr) + results <- resultPair{err: err} + } + }() defer wg.Done() for { select { diff --git a/go/parquet/pqarrow/file_reader_test.go b/go/parquet/pqarrow/file_reader_test.go index fe5a4547a77..815a4ab4905 100644 --- a/go/parquet/pqarrow/file_reader_test.go +++ b/go/parquet/pqarrow/file_reader_test.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "strings" "testing" @@ -373,3 +374,55 @@ func TestFileReaderColumnChunkBoundsErrors(t *testing.T) { assert.ErrorContains(t, tooHighErr, fmt.Sprintf("there are only %d columns", schema.NumFields())) } } + +func TestReadParquetFile(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_BAD_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_BAD_DATA") + } + assert.DirExists(t, dir) + filename := path.Join(dir, "ARROW-GH-43605.parquet") + ctx := context.TODO() + + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + + rdr, err := file.OpenParquetFile( + filename, + false, + file.WithReadProps(parquet.NewReaderProperties(mem)), + ) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + defer func() { + if err2 := rdr.Close(); err2 != nil { + t.Errorf("unexpected error: %v", err2) + } + }() + + arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{ + Parallel: true, + BatchSize: 0, + }, mem) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + table, err := arrowRdr.ReadTable(ctx) + defer table.Release() + + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + assert.Equal(t, table.NumCols(), int64(1)) + assert.Equal(t, table.NumRows(), int64(21186)) + + for i := 0; i < int(table.NumCols()); i++ { + col := table.Column(i) + assert.Equal(t, col.Len(), int(table.NumRows())) + for j := 0; j < col.Len(); j++ { + assert.Equal(t, col.Data().Chunk(0).(*array.Uint16).Value(j), uint16(0)) + } + } +}