From 5ecc896a8a540cd84ae3c7537c8f0050718550bb Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 28 Aug 2024 11:33:02 +0800 Subject: [PATCH 1/3] handle the error correctly Signed-off-by: bigsheeper --- go/parquet/file/record_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/parquet/file/record_reader.go b/go/parquet/file/record_reader.go index 667ffca77a8..765f4a9d34b 100755 --- a/go/parquet/file/record_reader.go +++ b/go/parquet/file/record_reader.go @@ -645,7 +645,7 @@ func (rr *recordReader) ReadRecords(numRecords int64) (int64, error) { } } - return recordsRead, nil + return recordsRead, rr.Err() } func (rr *recordReader) ReleaseValidBits() *memory.Buffer { From f0983846aed2b49ff23fbf708d925126e7cb2c3f Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 28 Aug 2024 18:06:06 +0800 Subject: [PATCH 2/3] add ut Signed-off-by: bigsheeper --- go/parquet/file/file_reader_test.go | 49 +++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/go/parquet/file/file_reader_test.go b/go/parquet/file/file_reader_test.go index 35f4da4e866..6513534f2ad 100644 --- a/go/parquet/file/file_reader_test.go +++ b/go/parquet/file/file_reader_test.go @@ -452,6 +452,55 @@ func TestRleBooleanEncodingFileRead(t *testing.T) { assert.Equal(t, expected, values[:len(expected)]) } +type mockBadReader struct { + cnt int + reader *os.File +} + +func (m *mockBadReader) Seek(offset int64, whence int) (int64, error) { + return m.reader.Seek(offset, whence) +} + +func (m *mockBadReader) ReadAt(p []byte, off int64) (n int, err error) { + if m.cnt == 0 { + return 0, fmt.Errorf("mock error") + } + m.cnt-- + return m.reader.ReadAt(p, off) +} + +func TestBadReader(t *testing.T) { + dir := os.Getenv("PARQUET_TEST_DATA") + if dir == "" { + t.Skip("no path supplied with PARQUET_TEST_DATA") + } + require.DirExists(t, dir) + + filePath := path.Join(dir, "byte_stream_split_extended.gzip.parquet") + f, err := os.Open(filePath) + assert.NoError(t, err) + defer f.Close() + + reader := &mockBadReader{ + cnt: 2, + reader: f, + } + r, err := file.NewParquetReader(reader, file.WithReadProps(&parquet.ReaderProperties{ + BufferSize: int64(1024), + BufferedStreamEnabled: true, + })) + assert.NoError(t, err) + + fileReader, err := pqarrow.NewFileReader(r, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + assert.NoError(t, err) + + columnReader, err := fileReader.GetColumn(context.Background(), 0) + assert.NoError(t, err) + + _, err = columnReader.NextBatch(1) + assert.Error(t, err) // Expect an error to occur. +} + func TestByteStreamSplitEncodingFileRead(t *testing.T) { dir := os.Getenv("PARQUET_TEST_DATA") if dir == "" { From 524bd32c3b3d9a7e2dba2822afa782d9ec2fd4d9 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Wed, 28 Aug 2024 19:48:46 +0800 Subject: [PATCH 3/3] improve ut Signed-off-by: bigsheeper --- go/parquet/file/file_reader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/parquet/file/file_reader_test.go b/go/parquet/file/file_reader_test.go index 6513534f2ad..74926c958e2 100644 --- a/go/parquet/file/file_reader_test.go +++ b/go/parquet/file/file_reader_test.go @@ -498,7 +498,7 @@ func TestBadReader(t *testing.T) { assert.NoError(t, err) _, err = columnReader.NextBatch(1) - assert.Error(t, err) // Expect an error to occur. + assert.ErrorContains(t, err, "mock error") // Expect an error to occur. } func TestByteStreamSplitEncodingFileRead(t *testing.T) {