From e6fc9fe0e56b2811fc96ea632eb1a553c4cb20c4 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 5 Nov 2021 09:14:15 -1000 Subject: [PATCH 1/2] ARROW-13703: Start of a 6.0.1 patch to restore old dataset behavior --- python/pyarrow/_dataset.pyx | 1 + python/pyarrow/includes/libarrow_dataset.pxd | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 42d702095c2..818edb2e73d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3381,6 +3381,7 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) + c_options.existing_data_behavior = ExistingDataBehavior_OVERWRITE if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, 'file_visitor': file_visitor} diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 2527cde62b0..e3c8cc564b5 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -86,6 +86,11 @@ ctypedef void cb_writer_finish(dict, CFileWriter*) cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: + cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior": + ExistingDataBehavior_ERROR" arrow::dataset::ExistingDataBehavior::kError" + ExistingDataBehavior_OVERWRITE" arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore" + + cdef cppclass CScanOptions "arrow::dataset::ScanOptions": @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) @@ -278,6 +283,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: c_string basename_template function[cb_writer_finish_internal] writer_pre_finish function[cb_writer_finish_internal] writer_post_finish + ExistingDataBehavior existing_data_behavior cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): From 551d68644193c8fdedda851efe6aaf0a6cc7a388 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 5 Nov 2021 13:32:43 -1000 Subject: [PATCH 2/2] ARROW-13703: Added existing data behavior bindings to python --- python/pyarrow/_dataset.pyx | 16 ++++++- python/pyarrow/dataset.py | 17 ++++++- python/pyarrow/includes/libarrow_dataset.pxd | 9 ++-- python/pyarrow/tests/test_dataset.py | 48 ++++++++++++++++++++ 4 files changed, 83 insertions(+), 7 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 818edb2e73d..00fdb112035 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3364,7 +3364,8 @@ def _filesystemdataset_write( Partitioning partitioning not None, FileWriteOptions file_options not None, int max_partitions, - object file_visitor + object file_visitor, + str existing_data_behavior not None ): """ CFileSystemDataset.Write wrapper @@ -3381,7 +3382,18 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) - c_options.existing_data_behavior = ExistingDataBehavior_OVERWRITE + if existing_data_behavior == 'error': + c_options.existing_data_behavior = ExistingDataBehavior_ERROR + elif existing_data_behavior == 'overwrite': + c_options.existing_data_behavior = ExistingDataBehavior_OVERWRITE + elif existing_data_behavior == 'delete_matching': + c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING + else: + raise ValueError( + ('existing_data_behavior must be one of error, overwrite or', + 'delete_matching') + ) + if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, 'file_visitor': file_visitor} diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 70aeb150b1b..33ed68c2c5d 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -736,7 +736,8 @@ def _ensure_write_partitioning(part, schema, flavor): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, partitioning_flavor=None, schema=None, filesystem=None, file_options=None, use_threads=True, - max_partitions=None, file_visitor=None): + max_partitions=None, file_visitor=None, + existing_data_behavior='error'): """ Write a dataset to a given format and partitioning. @@ -798,6 +799,18 @@ def write_dataset(data, base_dir, basename_template=None, format=None, def file_visitor(written_file): visited_paths.append(written_file.path) + existing_data_behavior : 'error' | 'overwrite' | 'delete_matching' + Controls how the dataset will handle data that already exists in + the destination. The default behavior (error) is to raise an error + if any data exists in the destination. + + overwrite will ignore any existing data and will overwrite files + as needed. + + delete_matching is useful when you are writing a partitioned + dataset. The first time each partition directory is encountered + the entire directory will be deleted. This allows you to overwrite + old partitions completely. """ from pyarrow.fs import _resolve_filesystem_and_path @@ -860,5 +873,5 @@ def file_visitor(written_file): _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions, file_visitor + file_options, max_partitions, file_visitor, existing_data_behavior ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index e3c8cc564b5..763fd21b6d1 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -87,9 +87,12 @@ ctypedef void cb_writer_finish(dict, CFileWriter*) cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior": - ExistingDataBehavior_ERROR" arrow::dataset::ExistingDataBehavior::kError" - ExistingDataBehavior_OVERWRITE" arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore" - + ExistingDataBehavior_DELETE_MATCHING" \ + arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions" + ExistingDataBehavior_OVERWRITE" \ + arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore" + ExistingDataBehavior_ERROR" \ + arrow::dataset::ExistingDataBehavior::kError" cdef cppclass CScanOptions "arrow::dataset::ScanOptions": @staticmethod diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e5590c4a6bf..3241c0ae039 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3480,6 +3480,54 @@ def test_write_dataset_with_dataset(tempdir): assert dict(load_back_table.to_pydict()) == table.to_pydict() +@pytest.mark.pandas +def test_write_dataset_existing_data(tempdir): + directory = tempdir / 'ds' + table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]}) + partitioning = ds.partitioning(schema=pa.schema( + [pa.field('c', pa.int64())]), flavor='hive') + + def compare_tables_ignoring_order(t1, t2): + df1 = t1.to_pandas().sort_values('b', ignore_index=True) + df2 = t2.to_pandas().sort_values('b', ignore_index=True) + assert df1.equals(df2) + + # First write is ok + ds.write_dataset(table, directory, partitioning=partitioning, format='ipc') + + table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]}) + + # Second write should fail + with pytest.raises(pa.ArrowInvalid): + ds.write_dataset(table, directory, + partitioning=partitioning, format='ipc') + + extra_table = pa.table({'b': ['e']}) + extra_file = directory / 'c=2' / 'foo.arrow' + pyarrow.feather.write_feather(extra_table, extra_file) + + # Should be ok and overwrite with overwrite behaivor + ds.write_dataset(table, directory, partitioning=partitioning, + format='ipc', existing_data_behavior='overwrite') + + overwritten = pa.table( + {'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]}) + readback = ds.dataset(tempdir, format='ipc', + partitioning=partitioning).to_table() + compare_tables_ignoring_order(readback, overwritten) + assert extra_file.exists() + + # Should be ok and delete matching with delete_matching + ds.write_dataset(table, directory, partitioning=partitioning, + format='ipc', existing_data_behavior='delete_matching') + + overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]}) + readback = ds.dataset(tempdir, format='ipc', + partitioning=partitioning).to_table() + compare_tables_ignoring_order(readback, overwritten) + assert not extra_file.exists() + + @pytest.mark.parquet @pytest.mark.pandas def test_write_dataset_partitioned_dict(tempdir):