From c2a21c45f588338b85fb86d416fc1dffa8a1d8f8 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 1 Oct 2020 08:41:49 -0400 Subject: [PATCH 1/5] ARROW-10134: [Python][Dataset] Add ParquetFileFragment.num_row_groups --- python/pyarrow/_dataset.pyx | 4 ++++ python/pyarrow/tests/test_dataset.py | 11 +++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3b8621f9709..d9a94bca0d7 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -962,6 +962,10 @@ cdef class ParquetFileFragment(FileFragment): return None return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups] + @property + def num_row_groups(self): + return None if self.row_groups is None else len(self.row_groups) + def split_by_row_group(self, Expression filter=None, Schema schema=None): """ diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index c1aa87d0c47..41e3d89a7af 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -246,13 +246,14 @@ def test_filesystem_dataset(mockfs): assert fragment.path == path assert isinstance(fragment.format, ds.ParquetFileFormat) assert isinstance(fragment, ds.ParquetFileFragment) - assert fragment.row_groups is None + assert fragment.row_groups is fragment.num_row_groups is None row_group_fragments = list(fragment.split_by_row_group()) - assert len(row_group_fragments) == 1 + assert fragment.num_row_groups == len(row_group_fragments) == 1 assert isinstance(row_group_fragments[0], ds.ParquetFileFragment) assert row_group_fragments[0].path == path assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)] + assert row_group_fragments[0].num_row_groups == 1 fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) assert len(fragments) == 2 @@ -605,8 +606,9 @@ def test_make_fragment(multisourcefs): assert isinstance(f, ds.ParquetFileFragment) assert f.path == path assert isinstance(f.filesystem, type(multisourcefs)) - assert fragment.row_groups is None + assert fragment.row_groups is fragment.num_row_groups is None assert row_group_fragment.row_groups == [ds.RowGroupInfo(0)] + assert row_group_fragment.num_row_groups == 1 def test_make_csv_fragment_from_buffer(): @@ -807,13 +809,14 @@ def test_fragments_parquet_row_groups(tempdir): # list and scan row group fragments row_group_fragments = list(fragment.split_by_row_group()) - assert len(row_group_fragments) == 2 + assert len(row_group_fragments) == fragment.num_row_groups == 2 result = row_group_fragments[0].to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] assert len(result) == 2 assert result.equals(table.slice(0, 2)) assert row_group_fragments[0].row_groups is not None + assert row_group_fragments[0].num_row_groups == 1 assert row_group_fragments[0].row_groups[0].statistics == { 'f1': {'min': 0, 'max': 1}, 'f2': {'min': 1, 'max': 1}, From 01779e7d5bd3b7cd19f42094162121a4e8f44240 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 5 Oct 2020 10:14:57 -0400 Subject: [PATCH 2/5] load num_row_groups if it is not available --- cpp/src/arrow/dataset/file_parquet.cc | 28 ++++++++++++++++---- cpp/src/arrow/dataset/file_parquet.h | 4 +++ python/pyarrow/_dataset.pyx | 6 ++++- python/pyarrow/includes/libarrow_dataset.pxd | 1 + python/pyarrow/tests/test_dataset.py | 7 +++-- 5 files changed, 38 insertions(+), 8 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index e036c4826c8..160c4b41b5f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -511,7 +511,25 @@ ParquetFileFragment::ParquetFileFragment(FileSource source, row_groups_(std::move(row_groups)), parquet_format_(checked_cast(*format_)), has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) && - physical_schema_ != nullptr) {} + physical_schema_ != nullptr) { + if (!row_groups_.empty()) { + // Empty row_groups_ indicates selection of all row groups in the file, so we must + // open a reader to determine the real count. + num_row_groups_ = static_cast(row_groups_.size()); + } +} + +Result ParquetFileFragment::GetNumRowGroups() { + auto lock = physical_schema_mutex_.Lock(); + if (num_row_groups_ == -1) { + ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_)); + num_row_groups_ = reader->num_row_groups(); + if (row_groups_.empty()) { + row_groups_ = RowGroupInfo::FromCount(num_row_groups_); + } + } + return num_row_groups_; +} Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) { if (HasCompleteMetadata()) { @@ -538,17 +556,17 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r physical_schema_ = std::move(schema); std::shared_ptr metadata = reader->parquet_reader()->metadata(); - int num_row_groups = metadata->num_row_groups(); + num_row_groups_ = metadata->num_row_groups(); if (row_groups_.empty()) { - row_groups_ = RowGroupInfo::FromCount(num_row_groups); + row_groups_ = RowGroupInfo::FromCount(num_row_groups_); } for (const RowGroupInfo& info : row_groups_) { // Ensure RowGroups are indexing valid RowGroups before augmenting. - if (info.id() >= num_row_groups) { + if (info.id() >= num_row_groups_) { return Status::IndexError("Trying to scan row group ", info.id(), " but ", - source_.path(), " only has ", num_row_groups, + source_.path(), " only has ", num_row_groups_, " row groups"); } } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 939fdc53687..576e370a499 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -216,6 +216,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// represents all RowGroups in the parquet file. const std::vector& row_groups() const { return row_groups_; } + /// \brief Return the number of row groups selected by this fragment. + Result GetNumRowGroups(); + /// \brief Indicate if the attached statistics are complete and the physical schema /// is cached. /// @@ -244,6 +247,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { std::vector row_groups_; ParquetFileFormat& parquet_format_; bool has_complete_metadata_; + int num_row_groups_ = -1; friend class ParquetFileFormat; }; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index d9a94bca0d7..c739f85cb91 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -964,7 +964,11 @@ cdef class ParquetFileFragment(FileFragment): @property def num_row_groups(self): - return None if self.row_groups is None else len(self.row_groups) + """ + Return the number of row groups viewed by this fragment (not the + number of row groups in the origin file). + """ + return GetResultValue(self.parquet_file_fragment.GetNumRowGroups()) def split_by_row_group(self, Expression filter=None, Schema schema=None): diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 3e7ca1d5752..589551aafd1 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -240,6 +240,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"( CFileFragment): const vector[CRowGroupInfo]& row_groups() const + CResult[int] GetNumRowGroups() CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup( shared_ptr[CExpression] predicate) CStatus EnsureCompleteMetadata() diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 41e3d89a7af..1872c182258 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -246,7 +246,8 @@ def test_filesystem_dataset(mockfs): assert fragment.path == path assert isinstance(fragment.format, ds.ParquetFileFormat) assert isinstance(fragment, ds.ParquetFileFragment) - assert fragment.row_groups is fragment.num_row_groups is None + assert fragment.row_groups is None + assert fragment.num_row_groups == 1 row_group_fragments = list(fragment.split_by_row_group()) assert fragment.num_row_groups == len(row_group_fragments) == 1 @@ -600,13 +601,15 @@ def test_make_fragment(multisourcefs): for path in dataset.files: fragment = parquet_format.make_fragment(path, multisourcefs) + assert fragment.row_groups is None + assert fragment.num_row_groups == 1 + row_group_fragment = parquet_format.make_fragment(path, multisourcefs, row_groups=[0]) for f in [fragment, row_group_fragment]: assert isinstance(f, ds.ParquetFileFragment) assert f.path == path assert isinstance(f.filesystem, type(multisourcefs)) - assert fragment.row_groups is fragment.num_row_groups is None assert row_group_fragment.row_groups == [ds.RowGroupInfo(0)] assert row_group_fragment.num_row_groups == 1 From 9f5fcd13788133cde33ad1b97c2ae23554a5190a Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 7 Oct 2020 11:04:04 -0400 Subject: [PATCH 3/5] correct setting num_row_groups_ in the event of subselection --- cpp/src/arrow/dataset/file_parquet.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 160c4b41b5f..eaf5ade6570 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -556,9 +556,9 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r physical_schema_ = std::move(schema); std::shared_ptr metadata = reader->parquet_reader()->metadata(); - num_row_groups_ = metadata->num_row_groups(); if (row_groups_.empty()) { + num_row_groups_ = metadata->num_row_groups(); row_groups_ = RowGroupInfo::FromCount(num_row_groups_); } From 3a67f3785551ab98e91f80e3c7332f20c02eaa6f Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 7 Oct 2020 13:33:45 -0400 Subject: [PATCH 4/5] don't use selected row group count to validate row group ids --- cpp/src/arrow/dataset/file_parquet.cc | 9 +++++---- python/pyarrow/tests/test_dataset.py | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index eaf5ade6570..dd264f511ca 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -556,17 +556,18 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r physical_schema_ = std::move(schema); std::shared_ptr metadata = reader->parquet_reader()->metadata(); + int num_row_groups = metadata->num_row_groups(); if (row_groups_.empty()) { - num_row_groups_ = metadata->num_row_groups(); - row_groups_ = RowGroupInfo::FromCount(num_row_groups_); + num_row_groups_ = num_row_groups; + row_groups_ = RowGroupInfo::FromCount(num_row_groups); } for (const RowGroupInfo& info : row_groups_) { // Ensure RowGroups are indexing valid RowGroups before augmenting. - if (info.id() >= num_row_groups_) { + if (info.id() >= num_row_groups) { return Status::IndexError("Trying to scan row group ", info.id(), " but ", - source_.path(), " only has ", num_row_groups_, + source_.path(), " only has ", num_row_groups, " row groups"); } } diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 1872c182258..56774f6caf3 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -832,6 +832,26 @@ def test_fragments_parquet_row_groups(tempdir): assert len(result) == 1 +@pytest.mark.parquet +def test_parquet_fragment_num_row_groups(tempdir): + import pyarrow.parquet as pq + + table = pa.table({'a': range(8)}) + pq.write_table(table, tempdir / "test.parquet", row_group_size=2) + dataset = ds.dataset(tempdir / "test.parquet", format="parquet") + original_fragment = list(dataset.get_fragments())[0] + + # create fragment with subset of row groups + fragment = original_fragment.format.make_fragment( + original_fragment.path, original_fragment.filesystem, + row_groups=[1, 3]) + assert fragment.num_row_groups == 2 + # ensure that parsing metadata preserves correct number of row groups + fragment.ensure_complete_metadata() + assert fragment.num_row_groups == 2 + assert len(fragment.row_groups) == 2 + + @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups_dictionary(tempdir): From caadfbd2655f896c8d6415fea2dc71d9eb19e580 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 7 Oct 2020 16:44:24 -0400 Subject: [PATCH 5/5] lint fix --- python/pyarrow/tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 56774f6caf3..371f5d8b460 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -844,7 +844,7 @@ def test_parquet_fragment_num_row_groups(tempdir): # create fragment with subset of row groups fragment = original_fragment.format.make_fragment( original_fragment.path, original_fragment.filesystem, - row_groups=[1, 3]) + row_groups=[1, 3]) assert fragment.num_row_groups == 2 # ensure that parsing metadata preserves correct number of row groups fragment.ensure_complete_metadata()