From 81aecfadb4b500aa9ddd37fcb297c1c8e4588088 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 1 Jul 2020 17:07:50 +0200 Subject: [PATCH 1/4] ARROW-9288: [C++][Dataset] Fix PartitioningFactory with dictionary encoding for HivePartioning --- cpp/src/arrow/dataset/partition.cc | 26 +++++++++++++++++++++++-- python/pyarrow/tests/test_dataset.py | 29 ++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 4631b4f02e5..14e534a2366 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -306,6 +306,17 @@ class KeyValuePartitioningInspectImpl { return ::arrow::schema(std::move(fields)); } + std::vector FieldNames() { + // only gives sensible result after Finish has been called + std::vector names; + names.reserve(name_to_index_.size()); + + for (auto kv : name_to_index_) { + names.push_back(kv.first); + } + return names; + } + private: std::unordered_map name_to_index_; std::vector> values_; @@ -646,15 +657,26 @@ class HivePartitioningFactory : public PartitioningFactory { } } - return impl.Finish(&dictionaries_); + auto schema_result = impl.Finish(&dictionaries_); + field_names_ = impl.FieldNames(); + return schema_result; } Result> Finish( const std::shared_ptr& schema) const override { - return std::shared_ptr(new HivePartitioning(schema, dictionaries_)); + for (FieldRef ref : field_names_) { + // ensure all of field_names_ are present in schema + RETURN_NOT_OK(ref.FindOne(*schema).status()); + } + + // drop fields which aren't in field_names_ + auto out_schema = SchemaFromColumnNames(schema, field_names_); + + return std::make_shared(std::move(out_schema), dictionaries_); } private: + std::vector field_names_; ArrayVector dictionaries_; PartitioningFactoryOptions options_; }; diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e71bb7cee40..b7200ed497c 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -1303,6 +1303,35 @@ def test_open_dataset_non_existing_file(): ds.dataset('file:i-am-not-existing.parquet', format='parquet') +@pytest.mark.parquet +@pytest.mark.parametrize('partitioning', ["directory", "hive"]) +def test_open_dataset_partitioned_dictionary_type(tempdir, partitioning): + # ARROW-9288 + import pyarrow.parquet as pq + table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) + + path = tempdir / "dataset" + path.mkdir() + + for part in ["A", "B", "C"]: + fmt = "{}" if partitioning == "directory" else "part={}" + part = path / fmt.format(part) + part.mkdir() + pq.write_table(table, part / "test.parquet") + + if partitioning == "directory": + part = ds.DirectoryPartitioning.discover( + ["part"], max_partition_dictionary_size=-1) + else: + part = ds.HivePartitioning.discover(max_partition_dictionary_size=-1) + + dataset = ds.dataset(str(path), partitioning=part) + expected_schema = table.schema.append( + pa.field("part", pa.dictionary(pa.int32(), pa.string())) + ) + assert dataset.schema.equals(expected_schema) + + @pytest.fixture def s3_example_simple(s3_connection, s3_server): from pyarrow.fs import FileSystem From bb32e8b73245f4a2a0122e0fb5d4d58f02b5ac53 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 8 Jul 2020 22:05:35 +0200 Subject: [PATCH 2/4] Update cpp/src/arrow/dataset/partition.cc Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/dataset/partition.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 14e534a2366..a7b3d362b22 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -657,9 +657,8 @@ class HivePartitioningFactory : public PartitioningFactory { } } - auto schema_result = impl.Finish(&dictionaries_); field_names_ = impl.FieldNames(); - return schema_result; + return impl.Finish(&dictionaries_); } Result> Finish( From 1342f56f45b504165ac0d6218af2f2d1a5a690f6 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 8 Jul 2020 22:06:00 +0200 Subject: [PATCH 3/4] Update cpp/src/arrow/dataset/partition.cc Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/dataset/partition.cc | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index a7b3d362b22..9608c1fbe17 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -663,15 +663,19 @@ class HivePartitioningFactory : public PartitioningFactory { Result> Finish( const std::shared_ptr& schema) const override { - for (FieldRef ref : field_names_) { - // ensure all of field_names_ are present in schema - RETURN_NOT_OK(ref.FindOne(*schema).status()); - } + if (dictionaries_.empty()) { + return std::make_shared(schema, dictionaries_); + } else { + for (FieldRef ref : field_names_) { + // ensure all of field_names_ are present in schema + RETURN_NOT_OK(ref.FindOne(*schema).status()); + } - // drop fields which aren't in field_names_ - auto out_schema = SchemaFromColumnNames(schema, field_names_); + // drop fields which aren't in field_names_ + auto out_schema = SchemaFromColumnNames(schema, field_names_); - return std::make_shared(std::move(out_schema), dictionaries_); + return std::make_shared(std::move(out_schema), dictionaries_); + } } private: From 0e89728f53b07bceb264d4bc48c64ea26b5a1664 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Wed, 8 Jul 2020 22:17:25 +0200 Subject: [PATCH 4/4] remove comment --- cpp/src/arrow/dataset/partition.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 9608c1fbe17..dcb91fa8fac 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -307,7 +307,6 @@ class KeyValuePartitioningInspectImpl { } std::vector FieldNames() { - // only gives sensible result after Finish has been called std::vector names; names.reserve(name_to_index_.size());