From 3df165ec671ac1ae4a7de75cb57e20efbbf594a0 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Wed, 4 May 2022 11:39:53 +0200 Subject: [PATCH 1/3] Fix write_to_dataset to not ignore partitioning and file_visitor keyord --- python/pyarrow/parquet/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 967d39d3db0..94e6dce21c4 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3125,7 +3125,6 @@ def file_visitor(written_file): "implementation." ) metadata_collector = kwargs.pop('metadata_collector', None) - file_visitor = None if metadata_collector is not None: def file_visitor(written_file): metadata_collector.append(written_file.metadata) @@ -3140,7 +3139,6 @@ def file_visitor(written_file): if filesystem is not None: filesystem = _ensure_filesystem(filesystem) - partitioning = None if partition_cols: part_schema = table.select(partition_cols).schema partitioning = ds.partitioning(part_schema, flavor="hive") From 0f11634435f066fb57ef64d114f7888e03ed4cdb Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Wed, 4 May 2022 15:13:33 +0200 Subject: [PATCH 2/3] Add a test for partitioning keyword --- python/pyarrow/tests/parquet/test_dataset.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 1bcaf4b9e45..b723ee1548d 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1782,3 +1782,21 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): with pytest.raises(ValueError, match="existing_data_behavior"): pq.write_to_dataset(table, path, use_legacy_dataset=True, existing_data_behavior='error') + + +@pytest.mark.dataset +def test_parquet_write_to_dataset_exposed_keywords(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / 'data.parquet' + + # Check if the partitioning keyward is not ignored + pq.write_to_dataset(table, path, partitioning=["a"], + use_legacy_dataset=False) + dataset = pq.ParquetDataset(path, use_legacy_dataset=False) + assert len(dataset.files) == 3 + + path2 = tempdir / 'data2.parquet' + # This should be single parquet file if partitioning keyword not set + pq.write_to_dataset(table, path2, use_legacy_dataset=False) + dataset2 = pq.ParquetDataset(path2, use_legacy_dataset=False) + assert len(dataset2.files) == 1 From 67a1bd8e9a3d6351a3beb4d2ef1fa1ea10b3bcf0 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 5 May 2022 08:54:07 +0200 Subject: [PATCH 3/3] Rearange the test to check file_visitor keyword also PLUS correct an error for existing_data_behavior check in the write_to_dataset --- python/pyarrow/parquet/__init__.py | 5 ++-- python/pyarrow/tests/parquet/test_dataset.py | 29 +++++++++++++------- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 94e6dce21c4..a9437aec05d 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3145,8 +3145,9 @@ def file_visitor(written_file): if basename_template is None: basename_template = guid() + '-{i}.parquet' - if existing_data_behavior is None: - existing_data_behavior = 'overwrite_or_ignore' + + if existing_data_behavior is None: + existing_data_behavior = 'overwrite_or_ignore' ds.write_dataset( table, root_path, filesystem=filesystem, diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index b723ee1548d..bbf27a98a3b 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -17,6 +17,7 @@ import datetime import os +import pathlib import numpy as np import pytest @@ -1787,16 +1788,24 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): @pytest.mark.dataset def test_parquet_write_to_dataset_exposed_keywords(tempdir): table = pa.table({'a': [1, 2, 3]}) - path = tempdir / 'data.parquet' + path = tempdir / 'partitioning' + + paths_written = [] + + def file_visitor(written_file): + paths_written.append(written_file.path) + + basename_template = 'part-{i}.parquet' - # Check if the partitioning keyward is not ignored pq.write_to_dataset(table, path, partitioning=["a"], + file_visitor=file_visitor, + basename_template=basename_template, use_legacy_dataset=False) - dataset = pq.ParquetDataset(path, use_legacy_dataset=False) - assert len(dataset.files) == 3 - - path2 = tempdir / 'data2.parquet' - # This should be single parquet file if partitioning keyword not set - pq.write_to_dataset(table, path2, use_legacy_dataset=False) - dataset2 = pq.ParquetDataset(path2, use_legacy_dataset=False) - assert len(dataset2.files) == 1 + + expected_paths = { + path / '1' / 'part-0.parquet', + path / '2' / 'part-0.parquet', + path / '3' / 'part-0.parquet' + } + paths_written_set = set(map(pathlib.Path, paths_written)) + assert paths_written_set == expected_paths