Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 86 additions & 18 deletions python/pyarrow/parquet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2926,8 +2926,10 @@ def _mkdir_if_not_exists(fs, path):

def write_to_dataset(table, root_path, partition_cols=None,
partition_filename_cb=None, filesystem=None,
use_legacy_dataset=None, **kwargs):
"""Wrapper around parquet.write_table for writing a Table to
use_legacy_dataset=None, schema=None,
partitioning=None, basename_template=None,
use_threads=None, file_visitor=None, **kwargs):
"""Wrapper around parquet.write_dataset for writing a Table to
Parquet format by partitions.
For each combination of partition columns and values,
a subdirectories are created in the following
Expand Down Expand Up @@ -2962,11 +2964,44 @@ def write_to_dataset(table, root_path, partition_cols=None,
and allow you to override the partition filename. If nothing is
passed, the filename will consist of a uuid.
use_legacy_dataset : bool
Default is True unless a ``pyarrow.fs`` filesystem is passed.
Set to False to enable the new code path (experimental, using the
new Arrow Dataset API). This is more efficient when using partition
columns, but does not (yet) support `partition_filename_cb` and
`metadata_collector` keywords.
Default is False. Set to True to use the the legacy behaviour
(this option is deprecated, and the legacy implementation will be
removed in a future version). The legacy implementation still
supports `partition_filename_cb` and `metadata_collector` keywords
but is less efficient when using partition columns.
use_threads : bool, default True
Write files in parallel. If enabled, then maximum parallelism will be
used determined by the number of available CPU cores.
schema : Schema, optional
partitioning : Partitioning or list[str], optional
The partitioning scheme specified with the
``pyarrow.dataset.partitioning()`` function or a list of field names.
When providing a list of field names, you can use
``partitioning_flavor`` to drive which partitioning type should be
used.
basename_template : str, optional
A template string used to generate basenames of written data files.
The token '{i}' will be replaced with an automatically incremented
integer. If not specified, it defaults to "guid-{i}.parquet"
file_visitor : function
If set, this function will be called with a WrittenFile instance
for each file created during the call. This object will have both
a path attribute and a metadata attribute.

The path attribute will be a string containing the path to
the created file.

The metadata attribute will be the parquet metadata of the file.
This metadata will have the file path attribute set and can be used
to build a _metadata file. The metadata attribute will be None if
the format is not parquet.

Example visitor which simple collects the filenames created::

visited_paths = []

def file_visitor(written_file):
visited_paths.append(written_file.path)
**kwargs : dict,
Additional kwargs for write_table function. See docstring for
`write_table` or `ParquetWriter` for more information.
Expand All @@ -2988,26 +3023,24 @@ def write_to_dataset(table, root_path, partition_cols=None,

>>> import pyarrow.parquet as pq
>>> pq.write_to_dataset(table, root_path='dataset_name_3',
... partition_cols=['year'],
... use_legacy_dataset=False
... )
... partition_cols=['year'])
>>> pq.ParquetDataset('dataset_name_3', use_legacy_dataset=False).files
['dataset_name_3/year=2019/part-0.parquet', ...

Write a single Parquet file into the root folder:

>>> pq.write_to_dataset(table, root_path='dataset_name_4',
... use_legacy_dataset=False)
>>> pq.write_to_dataset(table, root_path='dataset_name_4')
>>> pq.ParquetDataset('dataset_name_4/', use_legacy_dataset=False).files
['dataset_name_4/part-0.parquet']
"""
if use_legacy_dataset is None:
# if a new filesystem is passed -> default to new implementation
if isinstance(filesystem, FileSystem):
use_legacy_dataset = False
# otherwise the default is still True
else:
# if partition_filename_cb is specified ->
# default to the old implementation
if partition_filename_cb:
use_legacy_dataset = True
# otherwise the default is False
else:
use_legacy_dataset = False

if not use_legacy_dataset:
import pyarrow.dataset as ds
Expand Down Expand Up @@ -3042,13 +3075,48 @@ def file_visitor(written_file):
part_schema = table.select(partition_cols).schema
partitioning = ds.partitioning(part_schema, flavor="hive")

if basename_template is None:
basename_template = guid() + '-{i}.parquet'

ds.write_dataset(
table, root_path, filesystem=filesystem,
format=parquet_format, file_options=write_options, schema=schema,
partitioning=partitioning, use_threads=use_threads,
file_visitor=file_visitor)
file_visitor=file_visitor,
basename_template=basename_template,
existing_data_behavior='overwrite_or_ignore')
return

# warnings and errors when using legecy implementation
if use_legacy_dataset:
warnings.warn(
"Passing 'use_legacy_dataset=True' to get the legacy behaviour is "
"deprecated as of pyarrow 8.0.0, and the legacy implementation "
"will be removed in a future version.",
FutureWarning, stacklevel=2)
msg2 = (
"The '{}' argument is not supported with the legacy "
"implementation. To use this argument specify "
"'use_legacy_dataset=False' while constructing the "
"ParquetDataset."
)
if schema is not None:
raise ValueError(msg2.format("schema"))
if partitioning is not None:
raise ValueError(msg2.format("partitioning"))
if use_threads is not None:
raise ValueError(msg2.format("use_threads"))
if file_visitor is not None:
raise ValueError(msg2.format("file_visitor"))
if partition_filename_cb is not None:
warnings.warn(
_DEPR_MSG.format("partition_filename_cb", " Specify "
"'use_legacy_dataset=False' while constructing "
"the ParquetDataset, and then use the "
"'basename_template' parameter instead. For "
"usage see `pyarrow.dataset.write_dataset`"),
FutureWarning, stacklevel=2)

fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem)

_mkdir_if_not_exists(fs, root_path)
Expand Down
49 changes: 46 additions & 3 deletions python/pyarrow/tests/parquet/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1305,6 +1305,7 @@ def _test_write_to_dataset_no_partitions(base_path,
n = 5
for i in range(n):
pq.write_to_dataset(output_table, base_path,
use_legacy_dataset=use_legacy_dataset,
filesystem=filesystem)
output_files = [file for file in filesystem.ls(str(base_path))
if file.endswith(".parquet")]
Expand Down Expand Up @@ -1544,9 +1545,10 @@ def test_dataset_read_dictionary(tempdir, use_legacy_dataset):
path = tempdir / "ARROW-3325-dataset"
t1 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
t2 = pa.table([[util.rands(10) for i in range(5)] * 10], names=['f0'])
# TODO pass use_legacy_dataset (need to fix unique names)
pq.write_to_dataset(t1, root_path=str(path))
pq.write_to_dataset(t2, root_path=str(path))
pq.write_to_dataset(t1, root_path=str(path),
use_legacy_dataset=use_legacy_dataset)
pq.write_to_dataset(t2, root_path=str(path),
use_legacy_dataset=use_legacy_dataset)

result = pq.ParquetDataset(
path, read_dictionary=['f0'],
Expand Down Expand Up @@ -1716,3 +1718,44 @@ def test_parquet_dataset_deprecated_properties(tempdir):

with pytest.warns(DeprecationWarning, match="'ParquetDataset.pieces"):
dataset2.pieces


@pytest.mark.dataset
def test_parquet_write_to_dataset_deprecated_properties(tempdir):
table = pa.table({'a': [1, 2, 3]})
path = tempdir / 'data.parquet'

with pytest.warns(FutureWarning,
match="Passing 'use_legacy_dataset=True'"):
pq.write_to_dataset(table, path, use_legacy_dataset=True)

# check also that legacy implementation is set when
# partition_filename_cb is specified
with pytest.warns(FutureWarning,
match="Passing 'use_legacy_dataset=True'"):
pq.write_to_dataset(table, path,
partition_filename_cb=lambda x: 'filename.parquet')


@pytest.mark.dataset
def test_parquet_write_to_dataset_unsupported_keywards_in_legacy(tempdir):
table = pa.table({'a': [1, 2, 3]})
path = tempdir / 'data.parquet'

with pytest.raises(ValueError, match="schema"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
schema=pa.schema([
('a', pa.int32())
]))

with pytest.raises(ValueError, match="partitioning"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
partitioning=["a"])

with pytest.raises(ValueError, match="use_threads"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
use_threads=False)

with pytest.raises(ValueError, match="file_visitor"):
pq.write_to_dataset(table, path, use_legacy_dataset=True,
file_visitor=lambda x: x)
5 changes: 3 additions & 2 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None):
path = str(tempdir / "test_parquet_dataset")

# write_to_dataset currently requires pandas
pq.write_to_dataset(table, path,
pq.write_to_dataset(table, path, use_legacy_dataset=True,
partition_cols=["part"], chunk_size=chunk_size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So here this fails with using the new dataset implementation, because dataset.write_dataset(..) doesn't support the parquet row_group_size keyword (to which chunk_size gets translated). The ParquetFileWriteOptions doesn't support this keyword.

On the parquet side, this is also the only keyword that is not passed to the ParquetWriter init (and thus to parquet's WriterProperties or ArrowWriterProperties), but to the actual write_table call. In C++ this can be seen at

static ::arrow::Status Open(const ::arrow::Schema& schema, MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink,
std::shared_ptr<WriterProperties> properties,
std::shared_ptr<ArrowWriterProperties> arrow_properties,
std::unique_ptr<FileWriter>* writer);
virtual std::shared_ptr<::arrow::Schema> schema() const = 0;
/// \brief Write a Table to Parquet.
virtual ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size) = 0;

cc @westonpace do you remember if this has been discussed before how the row_group_size/chunk_size setting from Parquet fits into the dataset API?

Copy link
Member

@jorisvandenbossche jorisvandenbossche Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dataset API now has a max_rows_per_group, I see, but that doesn't necessarily directly relate to Parquet row groups?

It's more generic about how many rows are written in one go, but so effectively is therefore also a max parquet row group size? (since those need to be written in one go)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can open a follow-up JIRA for this one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataset = ds.dataset(
path, format="parquet", partitioning="hive", filesystem=filesystem
Expand Down Expand Up @@ -1247,7 +1247,8 @@ def _create_dataset_all_types(tempdir, chunk_size=None):
path = str(tempdir / "test_parquet_dataset_all_types")

# write_to_dataset currently requires pandas
pq.write_to_dataset(table, path, chunk_size=chunk_size)
pq.write_to_dataset(table, path, use_legacy_dataset=True,
chunk_size=chunk_size)

return table, ds.dataset(path, format="parquet", partitioning="hive")

Expand Down