From 4a8c02c75a6f8e28085ae2dd605a0da948030688 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 8 Apr 2022 15:36:30 +0200 Subject: [PATCH 1/8] First try at passing existing_data_behavior --- python/pyarrow/parquet/__init__.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 4e6b85f22cd..a5cbcc11e95 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2929,8 +2929,9 @@ def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, use_legacy_dataset=None, schema=None, partitioning=None, basename_template=None, - use_threads=None, file_visitor=None, **kwargs): - """Wrapper around parquet.write_dataset for writing a Table to + use_threads=None, file_visitor=None, + existing_data_behavior='error', **kwargs): + """Wrapper around parquet.write_table for writing a Table to Parquet format by partitions. For each combination of partition columns and values, a subdirectories are created in the following @@ -3003,6 +3004,23 @@ def write_to_dataset(table, root_path, partition_cols=None, def file_visitor(written_file): visited_paths.append(written_file.path) + existing_data_behavior : 'error' | 'overwrite_or_ignore' | \ +'delete_matching' + It is used in the new code path using the new Arrow Dataset API. + In case the legacy implementation is selected the parameter + is ignored as the old implementation does not support it. + Controls how the dataset will handle data that already exists in + the destination. The default behavior ('error') is to raise an error + if any data exists in the destination. + 'overwrite_or_ignore' will ignore any existing data and will + overwrite files with the same name as an output file. Other + existing files will be ignored. This behavior, in combination + with a unique basename_template for each write, will allow for + an append workflow. + 'delete_matching' is useful when you are writing a partitioned + dataset. The first time each partition directory is encountered + the entire directory will be deleted. This allows you to overwrite + old partitions completely. **kwargs : dict, Additional kwargs for write_table function. See docstring for `write_table` or `ParquetWriter` for more information. From 86c25ea0c78d35ee599ec8c2c6a36020b91f78da Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 21 Apr 2022 11:02:36 +0200 Subject: [PATCH 2/8] Make some changes after rebasing --- python/pyarrow/parquet/__init__.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index a5cbcc11e95..c4ff3a7927c 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2930,7 +2930,8 @@ def write_to_dataset(table, root_path, partition_cols=None, use_legacy_dataset=None, schema=None, partitioning=None, basename_template=None, use_threads=None, file_visitor=None, - existing_data_behavior='error', **kwargs): + existing_data_behavior='overwrite_or_ignore', + **kwargs): """Wrapper around parquet.write_table for writing a Table to Parquet format by partitions. For each combination of partition columns and values, @@ -3004,19 +3005,20 @@ def write_to_dataset(table, root_path, partition_cols=None, def file_visitor(written_file): visited_paths.append(written_file.path) - existing_data_behavior : 'error' | 'overwrite_or_ignore' | \ + existing_data_behavior : 'overwrite_or_ignore' | 'error' | \ 'delete_matching' It is used in the new code path using the new Arrow Dataset API. In case the legacy implementation is selected the parameter is ignored as the old implementation does not support it. Controls how the dataset will handle data that already exists in - the destination. The default behavior ('error') is to raise an error - if any data exists in the destination. - 'overwrite_or_ignore' will ignore any existing data and will + the destination. The default behavior + ('overwrite_or_ignore') will ignore any existing data and will overwrite files with the same name as an output file. Other existing files will be ignored. This behavior, in combination with a unique basename_template for each write, will allow for an append workflow. + 'error' will raise an error if any data exists in the destination. + if any data exists in the destination. 'delete_matching' is useful when you are writing a partitioned dataset. The first time each partition directory is encountered the entire directory will be deleted. This allows you to overwrite @@ -3103,7 +3105,7 @@ def file_visitor(written_file): partitioning=partitioning, use_threads=use_threads, file_visitor=file_visitor, basename_template=basename_template, - existing_data_behavior='overwrite_or_ignore') + existing_data_behavior=existing_data_behavior) return # warnings and errors when using legecy implementation @@ -3135,6 +3137,14 @@ def file_visitor(written_file): "'basename_template' parameter instead. For " "usage see `pyarrow.dataset.write_dataset`"), FutureWarning, stacklevel=2) + if existing_data_behavior is not None: + warnings.warn( + _DEPR_MSG.format("existing_data_behavior", " Specify " + "'use_legacy_dataset=False' while constructing " + "the ParquetDataset, and then use the " + "'basename_template' parameter instead. For " + "usage see `pyarrow.dataset.write_dataset`"), + FutureWarning, stacklevel=2) fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) From b3d6f5fc15d117807678fc88e43fe492a45b75c1 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 21 Apr 2022 11:42:40 +0200 Subject: [PATCH 3/8] Add a test for the deprecation warning --- python/pyarrow/tests/parquet/test_dataset.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 6477132dcd3..43222da480a 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1741,6 +1741,11 @@ def test_parquet_write_to_dataset_deprecated_properties(tempdir): pq.write_to_dataset(table, path, partition_filename_cb=lambda x: 'filename.parquet') + with pytest.warns(FutureWarning, + match="Passing 'use_legacy_dataset=True'"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + existing_data_behavior='error') + @pytest.mark.dataset def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): From eb234792e2e4310c10374558c6768d95772bc0cd Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 21 Apr 2022 20:25:25 +0200 Subject: [PATCH 4/8] Change the default for existing_data_behavior --- python/pyarrow/parquet/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index c4ff3a7927c..c535589b9b8 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2930,7 +2930,7 @@ def write_to_dataset(table, root_path, partition_cols=None, use_legacy_dataset=None, schema=None, partitioning=None, basename_template=None, use_threads=None, file_visitor=None, - existing_data_behavior='overwrite_or_ignore', + existing_data_behavior=None, **kwargs): """Wrapper around parquet.write_table for writing a Table to Parquet format by partitions. @@ -3011,9 +3011,8 @@ def file_visitor(written_file): In case the legacy implementation is selected the parameter is ignored as the old implementation does not support it. Controls how the dataset will handle data that already exists in - the destination. The default behavior - ('overwrite_or_ignore') will ignore any existing data and will - overwrite files with the same name as an output file. Other + the destination. 'overwrite_or_ignore' ignores any existing data and + will overwrite files with the same name as an output file. Other existing files will be ignored. This behavior, in combination with a unique basename_template for each write, will allow for an append workflow. @@ -3098,6 +3097,7 @@ def file_visitor(written_file): if basename_template is None: basename_template = guid() + '-{i}.parquet' + existing_data_behavior='overwrite_or_ignore' ds.write_dataset( table, root_path, filesystem=filesystem, From 9e20178bbc5e132c790f82fcc11f106f3e3ca5d2 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 21 Apr 2022 20:27:57 +0200 Subject: [PATCH 5/8] Correct warning for existing_data_behavior --- python/pyarrow/parquet/__init__.py | 10 ++-------- python/pyarrow/tests/parquet/test_dataset.py | 8 +++----- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index c535589b9b8..44a16ec3267 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3129,6 +3129,8 @@ def file_visitor(written_file): raise ValueError(msg2.format("use_threads")) if file_visitor is not None: raise ValueError(msg2.format("file_visitor")) + if existing_data_behavior is not None: + raise ValueError(msg2.format("existing_data_behavior")) if partition_filename_cb is not None: warnings.warn( _DEPR_MSG.format("partition_filename_cb", " Specify " @@ -3137,14 +3139,6 @@ def file_visitor(written_file): "'basename_template' parameter instead. For " "usage see `pyarrow.dataset.write_dataset`"), FutureWarning, stacklevel=2) - if existing_data_behavior is not None: - warnings.warn( - _DEPR_MSG.format("existing_data_behavior", " Specify " - "'use_legacy_dataset=False' while constructing " - "the ParquetDataset, and then use the " - "'basename_template' parameter instead. For " - "usage see `pyarrow.dataset.write_dataset`"), - FutureWarning, stacklevel=2) fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 43222da480a..4ab6d0ab5ca 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1741,11 +1741,6 @@ def test_parquet_write_to_dataset_deprecated_properties(tempdir): pq.write_to_dataset(table, path, partition_filename_cb=lambda x: 'filename.parquet') - with pytest.warns(FutureWarning, - match="Passing 'use_legacy_dataset=True'"): - pq.write_to_dataset(table, path, use_legacy_dataset=True, - existing_data_behavior='error') - @pytest.mark.dataset def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): @@ -1769,3 +1764,6 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): with pytest.raises(ValueError, match="file_visitor"): pq.write_to_dataset(table, path, use_legacy_dataset=True, file_visitor=lambda x: x) + with pytest.raises(ValueError, match="existing_data_behavior"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + existing_data_behavior='error') From 5bc04a1ffdf7c45137d3f28b7146671e4dd3c730 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 21 Apr 2022 20:29:28 +0200 Subject: [PATCH 6/8] Correct linter error --- python/pyarrow/parquet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 44a16ec3267..4a0778c040f 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3097,7 +3097,7 @@ def file_visitor(written_file): if basename_template is None: basename_template = guid() + '-{i}.parquet' - existing_data_behavior='overwrite_or_ignore' + existing_data_behavior = 'overwrite_or_ignore' ds.write_dataset( table, root_path, filesystem=filesystem, From 13dafdb8552b205429ff6abcf61bec07f05e9631 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 21 Apr 2022 20:36:16 +0200 Subject: [PATCH 7/8] Add a check when setting existing_data_behavior in the new API --- python/pyarrow/parquet/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 4a0778c040f..1373db80b08 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2985,7 +2985,7 @@ def write_to_dataset(table, root_path, partition_cols=None, basename_template : str, optional A template string used to generate basenames of written data files. The token '{i}' will be replaced with an automatically incremented - integer. If not specified, it defaults to "guid-{i}.parquet" + integer. If not specified, it defaults to "guid-{i}.parquet". file_visitor : function If set, this function will be called with a WrittenFile instance for each file created during the call. This object will have both @@ -3097,7 +3097,8 @@ def file_visitor(written_file): if basename_template is None: basename_template = guid() + '-{i}.parquet' - existing_data_behavior = 'overwrite_or_ignore' + if existing_data_behavior is None: + existing_data_behavior = 'overwrite_or_ignore' ds.write_dataset( table, root_path, filesystem=filesystem, From ac6c018b8bd656cae4d2d8a897175fc12e296a50 Mon Sep 17 00:00:00 2001 From: Joris Van den Bossche Date: Fri, 22 Apr 2022 11:31:00 +0200 Subject: [PATCH 8/8] update docstring --- python/pyarrow/parquet/__init__.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 1373db80b08..dc962c60ff8 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2970,8 +2970,8 @@ def write_to_dataset(table, root_path, partition_cols=None, Default is False. Set to True to use the the legacy behaviour (this option is deprecated, and the legacy implementation will be removed in a future version). The legacy implementation still - supports `partition_filename_cb` and `metadata_collector` keywords - but is less efficient when using partition columns. + supports the `partition_filename_cb` keyword but is less efficient + when using partition columns. use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be used determined by the number of available CPU cores. @@ -3007,17 +3007,22 @@ def file_visitor(written_file): visited_paths.append(written_file.path) existing_data_behavior : 'overwrite_or_ignore' | 'error' | \ 'delete_matching' - It is used in the new code path using the new Arrow Dataset API. - In case the legacy implementation is selected the parameter - is ignored as the old implementation does not support it. Controls how the dataset will handle data that already exists in - the destination. 'overwrite_or_ignore' ignores any existing data and - will overwrite files with the same name as an output file. Other + the destination. The default behaviour is 'overwrite_or_ignore'. + + Only used in the new code path using the new Arrow Dataset API + (``use_legacy_dataset=False``). In case the legacy implementation + is selected the parameter is ignored as the old implementation does + not support it (only has the default behaviour). + + 'overwrite_or_ignore' will ignore any existing data and will + overwrite files with the same name as an output file. Other existing files will be ignored. This behavior, in combination with a unique basename_template for each write, will allow for an append workflow. + 'error' will raise an error if any data exists in the destination. - if any data exists in the destination. + 'delete_matching' is useful when you are writing a partitioned dataset. The first time each partition directory is encountered the entire directory will be deleted. This allows you to overwrite