From 23c62fddf492f254575327ac51972900962597b2 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Wed, 6 Apr 2022 12:49:59 +0200 Subject: [PATCH 01/19] Deprecate use_legacy_dataset=True in parquet.write_to_dataset and correct some of the tests to not fail due to the change --- python/pyarrow/parquet/__init__.py | 31 ++++++++++++-------- python/pyarrow/tests/parquet/test_dataset.py | 18 ++++++++++-- python/pyarrow/tests/test_dataset.py | 8 +++-- 3 files changed, 38 insertions(+), 19 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 33094dabe68..7de5d636330 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2926,7 +2926,7 @@ def _mkdir_if_not_exists(fs, path): def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, - use_legacy_dataset=None, **kwargs): + use_legacy_dataset=False, **kwargs): """Wrapper around parquet.write_table for writing a Table to Parquet format by partitions. For each combination of partition columns and values, @@ -2962,11 +2962,11 @@ def write_to_dataset(table, root_path, partition_cols=None, and allow you to override the partition filename. If nothing is passed, the filename will consist of a uuid. use_legacy_dataset : bool - Default is True unless a ``pyarrow.fs`` filesystem is passed. - Set to False to enable the new code path (experimental, using the - new Arrow Dataset API). This is more efficient when using partition - columns, but does not (yet) support `partition_filename_cb` and - `metadata_collector` keywords. + Default is False. Set to True to use the the legacy behaviour + (this option is deprecated, and the legacy implementation will be + removed in a future version). The legacy implementation still + supports `partition_filename_cb` and `metadata_collector` keywords + but is less efficient when using partition columns. **kwargs : dict, Additional kwargs for write_table function. See docstring for `write_table` or `ParquetWriter` for more information. @@ -3001,13 +3001,18 @@ def write_to_dataset(table, root_path, partition_cols=None, >>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files ['dataset_name_4/part-0.parquet'] """ - if use_legacy_dataset is None: - # if a new filesystem is passed -> default to new implementation - if isinstance(filesystem, FileSystem): - use_legacy_dataset = False - # otherwise the default is still True - else: - use_legacy_dataset = True + if use_legacy_dataset: + warnings.warn( + "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " + "deprecated as of pyarrow 8.0.0, and the legacy implementation will " + "be removed in a future version.", + FutureWarning, stacklevel=2) + + # raise for unsupported keywords + if partition_filename_cb is not None: + warnings.warn( + _DEPR_MSG.format("partition_filename_cb", ""), + FutureWarning, stacklevel=2) if not use_legacy_dataset: import pyarrow.dataset as ds diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 2534cce73c6..5deb12af1e8 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1304,7 +1304,7 @@ def _test_write_to_dataset_no_partitions(base_path, # Without partitions, append files to root_path n = 5 for i in range(n): - pq.write_to_dataset(output_table, base_path, + pq.write_to_dataset(output_table, base_path, use_legacy_dataset=True, filesystem=filesystem) output_files = [file for file in filesystem.ls(str(base_path)) if file.endswith(".parquet")] @@ -1545,8 +1545,8 @@ def test_dataset_read_dictionary(tempdir, use_legacy_dataset): t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) # TODO pass use_legacy_dataset (need to fix unique names) - pq.write_to_dataset(t1, root_path=str(path)) - pq.write_to_dataset(t2, root_path=str(path)) + pq.write_to_dataset(t1, root_path=str(path), use_legacy_dataset=True) + pq.write_to_dataset(t2, root_path=str(path), use_legacy_dataset=True) result = pq.ParquetDataset( path, read_dictionary=['f0'], @@ -1716,3 +1716,15 @@ def test_parquet_dataset_deprecated_properties(tempdir): with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): dataset2.pieces + +@pytest.mark.dataset +def test_parquet_write_to_dataset_deprecated_properties(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / 'data.parquet' + + with pytest.warns(FutureWarning, match="Passing 'use_legacy_dataset=True'"): + pq.write_to_dataset(table, path, use_legacy_dataset=True) + + with pytest.warns(FutureWarning, match="Passing 'use_legacy_dataset=True'"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + partition_filename_cb=lambda x: 'file_name.parquet') diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 9a7f5ea213a..dfeb94a1840 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -971,7 +971,7 @@ def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None): path = str(tempdir / "test_parquet_dataset") # write_to_dataset currently requires pandas - pq.write_to_dataset(table, path, + pq.write_to_dataset(table, path, use_legacy_dataset=True, partition_cols=["part"], chunk_size=chunk_size) dataset = ds.dataset( path, format="parquet", partitioning="hive", filesystem=filesystem @@ -1247,7 +1247,8 @@ def _create_dataset_all_types(tempdir, chunk_size=None): path = str(tempdir / "test_parquet_dataset_all_types") # write_to_dataset currently requires pandas - pq.write_to_dataset(table, path, chunk_size=chunk_size) + pq.write_to_dataset(table, path, use_legacy_dataset=True, + chunk_size=chunk_size) return table, ds.dataset(path, format="parquet", partitioning="hive") @@ -3046,7 +3047,8 @@ def _create_parquet_dataset_simple(root_path): for i in range(4): table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)}) pq.write_to_dataset( - table, str(root_path), metadata_collector=metadata_collector + table, str(root_path), use_legacy_dataset=True, + metadata_collector=metadata_collector ) metadata_path = str(root_path / '_metadata') From a75664e6ab0489d3cd8dd938f62f589049a1ad2f Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Wed, 6 Apr 2022 14:09:00 +0200 Subject: [PATCH 02/19] Linter corrections --- python/pyarrow/parquet/__init__.py | 12 ++++++------ python/pyarrow/tests/parquet/test_dataset.py | 9 ++++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 7de5d636330..94836382c3f 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3003,16 +3003,16 @@ def write_to_dataset(table, root_path, partition_cols=None, """ if use_legacy_dataset: warnings.warn( - "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " - "deprecated as of pyarrow 8.0.0, and the legacy implementation will " - "be removed in a future version.", - FutureWarning, stacklevel=2) + "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " + "deprecated as of pyarrow 8.0.0, and the legacy implementation " + "will be removed in a future version.", + FutureWarning, stacklevel=2) # raise for unsupported keywords if partition_filename_cb is not None: warnings.warn( - _DEPR_MSG.format("partition_filename_cb", ""), - FutureWarning, stacklevel=2) + _DEPR_MSG.format("partition_filename_cb", ""), + FutureWarning, stacklevel=2) if not use_legacy_dataset: import pyarrow.dataset as ds diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 5deb12af1e8..4084ce89b05 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1717,14 +1717,17 @@ def test_parquet_dataset_deprecated_properties(tempdir): with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"): dataset2.pieces + @pytest.mark.dataset def test_parquet_write_to_dataset_deprecated_properties(tempdir): table = pa.table({'a': [1, 2, 3]}) path = tempdir / 'data.parquet' - with pytest.warns(FutureWarning, match="Passing 'use_legacy_dataset=True'"): + with pytest.warns(FutureWarning, + match="Passing 'use_legacy_dataset=True'"): pq.write_to_dataset(table, path, use_legacy_dataset=True) - with pytest.warns(FutureWarning, match="Passing 'use_legacy_dataset=True'"): + with pytest.warns(FutureWarning, + match="Passing 'use_legacy_dataset=True'"): pq.write_to_dataset(table, path, use_legacy_dataset=True, - partition_filename_cb=lambda x: 'file_name.parquet') + partition_filename_cb=lambda x: 'filename.parquet') From 5a9f5c5f021ffb3b5026c9ddc923226b143b04e9 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Thu, 7 Apr 2022 06:29:39 +0200 Subject: [PATCH 03/19] Rearrange the code a bit --- python/pyarrow/parquet/__init__.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 94836382c3f..b073d55263e 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3001,19 +3001,6 @@ def write_to_dataset(table, root_path, partition_cols=None, >>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files ['dataset_name_4/part-0.parquet'] """ - if use_legacy_dataset: - warnings.warn( - "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " - "deprecated as of pyarrow 8.0.0, and the legacy implementation " - "will be removed in a future version.", - FutureWarning, stacklevel=2) - - # raise for unsupported keywords - if partition_filename_cb is not None: - warnings.warn( - _DEPR_MSG.format("partition_filename_cb", ""), - FutureWarning, stacklevel=2) - if not use_legacy_dataset: import pyarrow.dataset as ds @@ -3054,6 +3041,13 @@ def file_visitor(written_file): file_visitor=file_visitor) return + if use_legacy_dataset: + warnings.warn( + "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " + "deprecated as of pyarrow 8.0.0, and the legacy implementation " + "will be removed in a future version.", + FutureWarning, stacklevel=2) + fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) _mkdir_if_not_exists(fs, root_path) @@ -3099,6 +3093,11 @@ def file_visitor(written_file): else: if partition_filename_cb: outfile = partition_filename_cb(None) + + # raise for unsupported keywords + warnings.warn( + _DEPR_MSG.format("partition_filename_cb", ""), + FutureWarning, stacklevel=2) else: outfile = guid() + '.parquet' full_path = '/'.join([root_path, outfile]) From 359ddc0e725e871b876ebe3adc1aabe77ca1b7f7 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 8 Apr 2022 09:55:31 +0200 Subject: [PATCH 04/19] Revert to using None and checking for partition_filename_cb keyward --- python/pyarrow/parquet/__init__.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index b073d55263e..520caf82045 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2926,7 +2926,7 @@ def _mkdir_if_not_exists(fs, path): def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, - use_legacy_dataset=False, **kwargs): + use_legacy_dataset=None, **kwargs): """Wrapper around parquet.write_table for writing a Table to Parquet format by partitions. For each combination of partition columns and values, @@ -3001,6 +3001,15 @@ def write_to_dataset(table, root_path, partition_cols=None, >>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files ['dataset_name_4/part-0.parquet'] """ + if use_legacy_dataset is None: + # if partition_filename_cb is specified -> + # default to the old implementation + if partition_filename_cb: + use_legacy_dataset = True + # otherwise the default is False + else: + use_legacy_dataset = False + if not use_legacy_dataset: import pyarrow.dataset as ds From eb891c949e4e4c230f452b14a13d03d82aa92038 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 8 Apr 2022 10:05:47 +0200 Subject: [PATCH 05/19] Add info to partition_filename_cb depr msg and add it where missing --- python/pyarrow/parquet/__init__.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 520caf82045..1319ce488a0 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3090,6 +3090,15 @@ def file_visitor(written_file): _mkdir_if_not_exists(fs, '/'.join([root_path, subdir])) if partition_filename_cb: outfile = partition_filename_cb(keys) + + # raise for unsupported keywords + warnings.warn( + _DEPR_MSG.format("partition_filename_cb", + " Specify 'use_legacy_dataset=False' while constructing the " + "ParquetDataset, and then use the 'basename_template' " + "parameter instead. For usage see " + "`pyarrow.dataset.write_dataset`"), + FutureWarning, stacklevel=2) else: outfile = guid() + '.parquet' relative_path = '/'.join([subdir, outfile]) @@ -3105,7 +3114,11 @@ def file_visitor(written_file): # raise for unsupported keywords warnings.warn( - _DEPR_MSG.format("partition_filename_cb", ""), + _DEPR_MSG.format("partition_filename_cb", + " Specify 'use_legacy_dataset=False' while constructing the " + "ParquetDataset, and then use the 'basename_template' " + "parameter instead. For usage see " + "`pyarrow.dataset.write_dataset`"), FutureWarning, stacklevel=2) else: outfile = guid() + '.parquet' From 90e6b72101348086a77d22e1c6dba55fefe5905f Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 8 Apr 2022 11:26:44 +0200 Subject: [PATCH 06/19] Add change to the write_to_dataset docstring --- python/pyarrow/parquet/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 1319ce488a0..c39794139ac 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2927,7 +2927,7 @@ def _mkdir_if_not_exists(fs, path): def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, use_legacy_dataset=None, **kwargs): - """Wrapper around parquet.write_table for writing a Table to + """Wrapper around parquet.write_dataset for writing a Table to Parquet format by partitions. For each combination of partition columns and values, a subdirectories are created in the following @@ -2936,14 +2936,14 @@ def write_to_dataset(table, root_path, partition_cols=None, root_dir/ group1=value1 group2=value1 - .parquet + part-0.parquet group2=value2 - .parquet + part-0.parquet group1=valueN group2=value1 - .parquet + part-0.parquet group2=valueN - .parquet + part-0.parquet Parameters ---------- From 55069f0afbfd36d30cba18b28fa5b705e6b61f6c Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 8 Apr 2022 14:28:56 +0200 Subject: [PATCH 07/19] Expose all write_dataset keywords to write_to_dataset --- python/pyarrow/parquet/__init__.py | 42 +++++++++++++++----- python/pyarrow/tests/parquet/test_dataset.py | 36 ++++++++++++++++- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index c39794139ac..a1be042f85c 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2926,7 +2926,10 @@ def _mkdir_if_not_exists(fs, path): def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, - use_legacy_dataset=None, **kwargs): + use_legacy_dataset=None, format=None, + file_options=None, schema=None, + partitioning=None, use_threads=None, + file_visitor=None, **kwargs): """Wrapper around parquet.write_dataset for writing a Table to Parquet format by partitions. For each combination of partition columns and values, @@ -3050,12 +3053,37 @@ def file_visitor(written_file): file_visitor=file_visitor) return + # warnings and errors when using legecy implementation if use_legacy_dataset: warnings.warn( "Passing 'use_legacy_dataset=True' to get the legacy behaviour is " "deprecated as of pyarrow 8.0.0, and the legacy implementation " "will be removed in a future version.", FutureWarning, stacklevel=2) + msg2 = ( + "The '{}' argument is not supported with the legacy " + "implementation. To use this argument specify " + "'use_legacy_dataset=False' while constructing the " + "ParquetDataset." + ) + if format is not None: + raise ValueError(msg2.format("format")) + if file_options is not None: + raise ValueError(msg2.format("file_options")) + if schema is not None: + raise ValueError(msg2.format("schema")) + if partitioning is not None: + raise ValueError(msg2.format("partitioning")) + if use_threads is not None: + raise ValueError(msg2.format("use_threads")) + if file_visitor is not None: + raise ValueError(msg2.format("file_visitor")) + msg3 = ( + " Specify 'use_legacy_dataset=False' while " + " constructing the ParquetDataset, and then use the " + "'basename_template' parameter instead. For usage see " + "`pyarrow.dataset.write_dataset`" + ) fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) @@ -3093,11 +3121,7 @@ def file_visitor(written_file): # raise for unsupported keywords warnings.warn( - _DEPR_MSG.format("partition_filename_cb", - " Specify 'use_legacy_dataset=False' while constructing the " - "ParquetDataset, and then use the 'basename_template' " - "parameter instead. For usage see " - "`pyarrow.dataset.write_dataset`"), + _DEPR_MSG.format("partition_filename_cb", msg3), FutureWarning, stacklevel=2) else: outfile = guid() + '.parquet' @@ -3114,11 +3138,7 @@ def file_visitor(written_file): # raise for unsupported keywords warnings.warn( - _DEPR_MSG.format("partition_filename_cb", - " Specify 'use_legacy_dataset=False' while constructing the " - "ParquetDataset, and then use the 'basename_template' " - "parameter instead. For usage see " - "`pyarrow.dataset.write_dataset`"), + _DEPR_MSG.format("partition_filename_cb", msg3), FutureWarning, stacklevel=2) else: outfile = guid() + '.parquet' diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 4084ce89b05..bcc695f682d 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1727,7 +1727,41 @@ def test_parquet_write_to_dataset_deprecated_properties(tempdir): match="Passing 'use_legacy_dataset=True'"): pq.write_to_dataset(table, path, use_legacy_dataset=True) + # check also that legacy implementation is set when + # partition_filename_cb is specified with pytest.warns(FutureWarning, match="Passing 'use_legacy_dataset=True'"): - pq.write_to_dataset(table, path, use_legacy_dataset=True, + pq.write_to_dataset(table, path, partition_filename_cb=lambda x: 'filename.parquet') + + +@pytest.mark.dataset +def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): + table = pa.table({'a': [1, 2, 3]}) + path = tempdir / 'data.parquet' + + with pytest.raises(ValueError, match="format"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + format="ipc") + + with pytest.raises(ValueError, match="file_options"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + file_options=True) + + with pytest.raises(ValueError, match="schema"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + schema=pa.schema([ + ('a', pa.int32()) + ])) + + with pytest.raises(ValueError, match="partitioning"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + partitioning=["a"]) + + with pytest.raises(ValueError, match="use_threads"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + use_threads=False) + + with pytest.raises(ValueError, match="file_visitor"): + pq.write_to_dataset(table, path, use_legacy_dataset=True, + file_visitor=lambda x: x) From 19c9440e01632086b6c3063ccbdde00e0a88fff4 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 8 Apr 2022 15:41:06 +0200 Subject: [PATCH 08/19] Add keyword parameters to the docstrings --- python/pyarrow/parquet/__init__.py | 37 ++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index a1be042f85c..7815d9a8e04 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2970,6 +2970,43 @@ def write_to_dataset(table, root_path, partition_cols=None, removed in a future version). The legacy implementation still supports `partition_filename_cb` and `metadata_collector` keywords but is less efficient when using partition columns. + format : FileFormat or str + The format in which to write the dataset. Currently supported: + "parquet", "ipc"/"arrow"/"feather", and "csv". If a FileSystemDataset + is being written and `format` is not specified, it defaults to the + same format as the specified FileSystemDataset. When writing a + Table or RecordBatch, this keyword is required. + file_options : pyarrow.dataset.FileWriteOptions, optional + FileFormat specific write options, created using the + ``FileFormat.make_write_options()`` function. + use_threads : bool, default True + Write files in parallel. If enabled, then maximum parallelism will be + used determined by the number of available CPU cores. + schema : Schema, optional + partitioning : Partitioning or list[str], optional + The partitioning scheme specified with the ``partitioning()`` + function or a list of field names. When providing a list of + field names, you can use ``partitioning_flavor`` to drive which + partitioning type should be used. + file_visitor : function + If set, this function will be called with a WrittenFile instance + for each file created during the call. This object will have both + a path attribute and a metadata attribute. + + The path attribute will be a string containing the path to + the created file. + + The metadata attribute will be the parquet metadata of the file. + This metadata will have the file path attribute set and can be used + to build a _metadata file. The metadata attribute will be None if + the format is not parquet. + + Example visitor which simple collects the filenames created:: + + visited_paths = [] + + def file_visitor(written_file): + visited_paths.append(written_file.path) **kwargs : dict, Additional kwargs for write_table function. See docstring for `write_table` or `ParquetWriter` for more information. From 1417886fc5747b86efd1cc6cbc6bbdab372d70ad Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 09:29:15 +0200 Subject: [PATCH 09/19] Remove format from write_to_dataset keywords --- python/pyarrow/parquet/__init__.py | 8 -------- python/pyarrow/tests/parquet/test_dataset.py | 4 ---- 2 files changed, 12 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 7815d9a8e04..510cd89b9ad 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2970,12 +2970,6 @@ def write_to_dataset(table, root_path, partition_cols=None, removed in a future version). The legacy implementation still supports `partition_filename_cb` and `metadata_collector` keywords but is less efficient when using partition columns. - format : FileFormat or str - The format in which to write the dataset. Currently supported: - "parquet", "ipc"/"arrow"/"feather", and "csv". If a FileSystemDataset - is being written and `format` is not specified, it defaults to the - same format as the specified FileSystemDataset. When writing a - Table or RecordBatch, this keyword is required. file_options : pyarrow.dataset.FileWriteOptions, optional FileFormat specific write options, created using the ``FileFormat.make_write_options()`` function. @@ -3103,8 +3097,6 @@ def file_visitor(written_file): "'use_legacy_dataset=False' while constructing the " "ParquetDataset." ) - if format is not None: - raise ValueError(msg2.format("format")) if file_options is not None: raise ValueError(msg2.format("file_options")) if schema is not None: diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index bcc695f682d..6e1ee1b8895 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1740,10 +1740,6 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): table = pa.table({'a': [1, 2, 3]}) path = tempdir / 'data.parquet' - with pytest.raises(ValueError, match="format"): - pq.write_to_dataset(table, path, use_legacy_dataset=True, - format="ipc") - with pytest.raises(ValueError, match="file_options"): pq.write_to_dataset(table, path, use_legacy_dataset=True, file_options=True) From 52b7f5ade444fa30eb478086301dba2bf777c940 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 09:34:20 +0200 Subject: [PATCH 10/19] Move the check and warning for partition_filename_cb --- python/pyarrow/parquet/__init__.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 510cd89b9ad..6a1e9fa07be 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3107,12 +3107,14 @@ def file_visitor(written_file): raise ValueError(msg2.format("use_threads")) if file_visitor is not None: raise ValueError(msg2.format("file_visitor")) - msg3 = ( - " Specify 'use_legacy_dataset=False' while " - " constructing the ParquetDataset, and then use the " - "'basename_template' parameter instead. For usage see " - "`pyarrow.dataset.write_dataset`" - ) + if partition_filename_cb is not None: + warnings.warn( + _DEPR_MSG.format("partition_filename_cb", " Specify " + "'use_legacy_dataset=False' while constructing " + "the ParquetDataset, and then use the " + "'basename_template' parameter instead. For usage " + "see `pyarrow.dataset.write_dataset`"), + FutureWarning, stacklevel=2) fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) @@ -3147,11 +3149,6 @@ def file_visitor(written_file): _mkdir_if_not_exists(fs, '/'.join([root_path, subdir])) if partition_filename_cb: outfile = partition_filename_cb(keys) - - # raise for unsupported keywords - warnings.warn( - _DEPR_MSG.format("partition_filename_cb", msg3), - FutureWarning, stacklevel=2) else: outfile = guid() + '.parquet' relative_path = '/'.join([subdir, outfile]) @@ -3164,11 +3161,6 @@ def file_visitor(written_file): else: if partition_filename_cb: outfile = partition_filename_cb(None) - - # raise for unsupported keywords - warnings.warn( - _DEPR_MSG.format("partition_filename_cb", msg3), - FutureWarning, stacklevel=2) else: outfile = guid() + '.parquet' full_path = '/'.join([root_path, outfile]) From 26ad057d341e4f1c5b0185be6a95b5b38fe58b49 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 09:36:29 +0200 Subject: [PATCH 11/19] Linter corrections --- python/pyarrow/parquet/__init__.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 6a1e9fa07be..e2c7972ffcc 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -3110,10 +3110,10 @@ def file_visitor(written_file): if partition_filename_cb is not None: warnings.warn( _DEPR_MSG.format("partition_filename_cb", " Specify " - "'use_legacy_dataset=False' while constructing " - "the ParquetDataset, and then use the " - "'basename_template' parameter instead. For usage " - "see `pyarrow.dataset.write_dataset`"), + "'use_legacy_dataset=False' while constructing " + "the ParquetDataset, and then use the " + "'basename_template' parameter instead. For " + "usage see `pyarrow.dataset.write_dataset`"), FutureWarning, stacklevel=2) fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) From b0e4423e8ac527dd521349f65b73f056b1b2a169 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 13:37:56 +0200 Subject: [PATCH 12/19] Add basename_tamplate to new implementation - mimic old implementation --- python/pyarrow/parquet/__init__.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index e2c7972ffcc..6d32f7fe9a8 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2928,8 +2928,8 @@ def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, use_legacy_dataset=None, format=None, file_options=None, schema=None, - partitioning=None, use_threads=None, - file_visitor=None, **kwargs): + partitioning=None, basename_template=None, + use_threads=None, file_visitor=None, **kwargs): """Wrapper around parquet.write_dataset for writing a Table to Parquet format by partitions. For each combination of partition columns and values, @@ -2982,6 +2982,11 @@ def write_to_dataset(table, root_path, partition_cols=None, function or a list of field names. When providing a list of field names, you can use ``partitioning_flavor`` to drive which partitioning type should be used. + basename_template : str, optional + A template string used to generate basenames of written data files. + The token '{i}' will be replaced with an automatically incremented + integer. If not specified, it defaults to + "part-{i}." + format.default_extname file_visitor : function If set, this function will be called with a WrittenFile instance for each file created during the call. This object will have both @@ -3077,11 +3082,16 @@ def file_visitor(written_file): part_schema = table.select(partition_cols).schema partitioning = ds.partitioning(part_schema, flavor="hive") + if basename_template is None: + basename_template = guid() + '{i}.parquet' + ds.write_dataset( table, root_path, filesystem=filesystem, format=parquet_format, file_options=write_options, schema=schema, partitioning=partitioning, use_threads=use_threads, - file_visitor=file_visitor) + file_visitor=file_visitor, + basename_template=basename_template, + existing_data_behavior='overwrite_or_ignore') return # warnings and errors when using legecy implementation From 0515a7d4b23bddc3887863530c3da1cfbd998e9c Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 13:42:50 +0200 Subject: [PATCH 13/19] Make corrections to the docstrings --- python/pyarrow/parquet/__init__.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 6d32f7fe9a8..c11c95b9a5c 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2939,14 +2939,14 @@ def write_to_dataset(table, root_path, partition_cols=None, root_dir/ group1=value1 group2=value1 - part-0.parquet + .parquet group2=value2 - part-0.parquet + .parquet group1=valueN group2=value1 - part-0.parquet + .parquet group2=valueN - part-0.parquet + .parquet Parameters ---------- @@ -3027,16 +3027,13 @@ def file_visitor(written_file): >>> import pyarrow.parquet as pq >>> pq.write_to_dataset(table, root_path='dataset_name_3', - ... partition_cols=['year'], - ... use_legacy_dataset=False - ... ) + ... partition_cols=['year']) >>> pq.ParquetDataset('dataset_name_3', use_legacy_dataset=False).files ['dataset_name_3/year=2019/part-0.parquet', ... Write a single Parquet file into the root folder: - >>> pq.write_to_dataset(table, root_path='dataset_name_4', - ... use_legacy_dataset=False) + >>> pq.write_to_dataset(table, root_path='dataset_name_4') >>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files ['dataset_name_4/part-0.parquet'] """ From 4477b5f87d5c9ebeb1764c0734b775d0b67f942a Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 14:13:45 +0200 Subject: [PATCH 14/19] Remove format from keywords --- python/pyarrow/parquet/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index c11c95b9a5c..943c59fb13d 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2926,9 +2926,8 @@ def _mkdir_if_not_exists(fs, path): def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, - use_legacy_dataset=None, format=None, - file_options=None, schema=None, - partitioning=None, basename_template=None, + use_legacy_dataset=None, file_options=None, + schema=None, partitioning=None, basename_template=None, use_threads=None, file_visitor=None, **kwargs): """Wrapper around parquet.write_dataset for writing a Table to Parquet format by partitions. From 71c97dcac22c1d7df8087236ed5143e56ac97218 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 14:54:31 +0200 Subject: [PATCH 15/19] Remove file_options keyword --- python/pyarrow/parquet/__init__.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 943c59fb13d..74795c3a683 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2926,8 +2926,8 @@ def _mkdir_if_not_exists(fs, path): def write_to_dataset(table, root_path, partition_cols=None, partition_filename_cb=None, filesystem=None, - use_legacy_dataset=None, file_options=None, - schema=None, partitioning=None, basename_template=None, + use_legacy_dataset=None, schema=None, + partitioning=None, basename_template=None, use_threads=None, file_visitor=None, **kwargs): """Wrapper around parquet.write_dataset for writing a Table to Parquet format by partitions. @@ -2969,9 +2969,6 @@ def write_to_dataset(table, root_path, partition_cols=None, removed in a future version). The legacy implementation still supports `partition_filename_cb` and `metadata_collector` keywords but is less efficient when using partition columns. - file_options : pyarrow.dataset.FileWriteOptions, optional - FileFormat specific write options, created using the - ``FileFormat.make_write_options()`` function. use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be used determined by the number of available CPU cores. @@ -3103,8 +3100,6 @@ def file_visitor(written_file): "'use_legacy_dataset=False' while constructing the " "ParquetDataset." ) - if file_options is not None: - raise ValueError(msg2.format("file_options")) if schema is not None: raise ValueError(msg2.format("schema")) if partitioning is not None: From 170ef523278d8d4e2f21123324372af2492a5fe9 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 15:01:53 +0200 Subject: [PATCH 16/19] Amend the tests to use use_legacy_dataset=use_legacy_dataset --- python/pyarrow/tests/parquet/test_dataset.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/tests/parquet/test_dataset.py b/python/pyarrow/tests/parquet/test_dataset.py index 6e1ee1b8895..6326743113f 100644 --- a/python/pyarrow/tests/parquet/test_dataset.py +++ b/python/pyarrow/tests/parquet/test_dataset.py @@ -1304,7 +1304,8 @@ def _test_write_to_dataset_no_partitions(base_path, # Without partitions, append files to root_path n = 5 for i in range(n): - pq.write_to_dataset(output_table, base_path, use_legacy_dataset=True, + pq.write_to_dataset(output_table, base_path, + use_legacy_dataset=use_legacy_dataset, filesystem=filesystem) output_files = [file for file in filesystem.ls(str(base_path)) if file.endswith(".parquet")] @@ -1544,9 +1545,10 @@ def test_dataset_read_dictionary(tempdir, use_legacy_dataset): path = tempdir / "ARROW-3325-dataset" t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0']) - # TODO pass use_legacy_dataset (need to fix unique names) - pq.write_to_dataset(t1, root_path=str(path), use_legacy_dataset=True) - pq.write_to_dataset(t2, root_path=str(path), use_legacy_dataset=True) + pq.write_to_dataset(t1, root_path=str(path), + use_legacy_dataset=use_legacy_dataset) + pq.write_to_dataset(t2, root_path=str(path), + use_legacy_dataset=use_legacy_dataset) result = pq.ParquetDataset( path, read_dictionary=['f0'], @@ -1740,10 +1742,6 @@ def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir): table = pa.table({'a': [1, 2, 3]}) path = tempdir / 'data.parquet' - with pytest.raises(ValueError, match="file_options"): - pq.write_to_dataset(table, path, use_legacy_dataset=True, - file_options=True) - with pytest.raises(ValueError, match="schema"): pq.write_to_dataset(table, path, use_legacy_dataset=True, schema=pa.schema([ From aa33d714ab5793f4176fc730373897fd0eea63f8 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Fri, 15 Apr 2022 14:47:17 +0200 Subject: [PATCH 17/19] Apply suggestions from code review Co-authored-by: Joris Van den Bossche --- python/pyarrow/parquet/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 74795c3a683..5739148daa1 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2981,8 +2981,7 @@ def write_to_dataset(table, root_path, partition_cols=None, basename_template : str, optional A template string used to generate basenames of written data files. The token '{i}' will be replaced with an automatically incremented - integer. If not specified, it defaults to - "part-{i}." + format.default_extname + integer. If not specified, it defaults to "guid-{i}.parquet" file_visitor : function If set, this function will be called with a WrittenFile instance for each file created during the call. This object will have both @@ -3076,7 +3075,7 @@ def file_visitor(written_file): partitioning = ds.partitioning(part_schema, flavor="hive") if basename_template is None: - basename_template = guid() + '{i}.parquet' + basename_template = guid() + '-{i}.parquet' ds.write_dataset( table, root_path, filesystem=filesystem, From c2cf5291847e8de12c9abc0b55e336bf26cdd48b Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Tue, 19 Apr 2022 19:51:30 +0200 Subject: [PATCH 18/19] Update python/pyarrow/parquet/__init__.py Co-authored-by: Joris Van den Bossche --- python/pyarrow/parquet/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 5739148daa1..2666d175d28 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2974,7 +2974,7 @@ def write_to_dataset(table, root_path, partition_cols=None, used determined by the number of available CPU cores. schema : Schema, optional partitioning : Partitioning or list[str], optional - The partitioning scheme specified with the ``partitioning()`` + The partitioning scheme specified with the ``pyarrow.dataset.partitioning()`` function or a list of field names. When providing a list of field names, you can use ``partitioning_flavor`` to drive which partitioning type should be used. From 6c88c332e1e4040da43a0039416c4a9ecb9f83d1 Mon Sep 17 00:00:00 2001 From: Alenka Frim Date: Tue, 19 Apr 2022 20:08:30 +0200 Subject: [PATCH 19/19] Correct _create_parquet_dataset_simple helper function and linter error --- python/pyarrow/parquet/__init__.py | 9 +++++---- python/pyarrow/tests/test_dataset.py | 3 +-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/parquet/__init__.py b/python/pyarrow/parquet/__init__.py index 2666d175d28..867babdaf81 100644 --- a/python/pyarrow/parquet/__init__.py +++ b/python/pyarrow/parquet/__init__.py @@ -2974,10 +2974,11 @@ def write_to_dataset(table, root_path, partition_cols=None, used determined by the number of available CPU cores. schema : Schema, optional partitioning : Partitioning or list[str], optional - The partitioning scheme specified with the ``pyarrow.dataset.partitioning()`` - function or a list of field names. When providing a list of - field names, you can use ``partitioning_flavor`` to drive which - partitioning type should be used. + The partitioning scheme specified with the + ``pyarrow.dataset.partitioning()`` function or a list of field names. + When providing a list of field names, you can use + ``partitioning_flavor`` to drive which partitioning type should be + used. basename_template : str, optional A template string used to generate basenames of written data files. The token '{i}' will be replaced with an automatically incremented diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index dfeb94a1840..19448d36870 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3047,8 +3047,7 @@ def _create_parquet_dataset_simple(root_path): for i in range(4): table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)}) pq.write_to_dataset( - table, str(root_path), use_legacy_dataset=True, - metadata_collector=metadata_collector + table, str(root_path), metadata_collector=metadata_collector ) metadata_path = str(root_path / '_metadata')