From 7c0494e2d0d0573b87e0b943f1a543812f935d77 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 5 Nov 2021 13:32:43 -1000 Subject: [PATCH 1/6] ARROW-13703: Added existing data behavior bindings to python --- python/pyarrow/_dataset.pyx | 15 +++++- python/pyarrow/dataset.py | 17 ++++++- python/pyarrow/includes/libarrow_dataset.pxd | 8 ++++ python/pyarrow/tests/test_dataset.py | 48 ++++++++++++++++++++ 4 files changed, 85 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 42d702095c2..00fdb112035 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3364,7 +3364,8 @@ def _filesystemdataset_write( Partitioning partitioning not None, FileWriteOptions file_options not None, int max_partitions, - object file_visitor + object file_visitor, + str existing_data_behavior not None ): """ CFileSystemDataset.Write wrapper @@ -3381,6 +3382,18 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) + if existing_data_behavior == 'error': + c_options.existing_data_behavior = ExistingDataBehavior_ERROR + elif existing_data_behavior == 'overwrite': + c_options.existing_data_behavior = ExistingDataBehavior_OVERWRITE + elif existing_data_behavior == 'delete_matching': + c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING + else: + raise ValueError( + ('existing_data_behavior must be one of error, overwrite or', + 'delete_matching') + ) + if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, 'file_visitor': file_visitor} diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 70aeb150b1b..33ed68c2c5d 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -736,7 +736,8 @@ def _ensure_write_partitioning(part, schema, flavor): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, partitioning_flavor=None, schema=None, filesystem=None, file_options=None, use_threads=True, - max_partitions=None, file_visitor=None): + max_partitions=None, file_visitor=None, + existing_data_behavior='error'): """ Write a dataset to a given format and partitioning. @@ -798,6 +799,18 @@ def write_dataset(data, base_dir, basename_template=None, format=None, def file_visitor(written_file): visited_paths.append(written_file.path) + existing_data_behavior : 'error' | 'overwrite' | 'delete_matching' + Controls how the dataset will handle data that already exists in + the destination. The default behavior (error) is to raise an error + if any data exists in the destination. + + overwrite will ignore any existing data and will overwrite files + as needed. + + delete_matching is useful when you are writing a partitioned + dataset. The first time each partition directory is encountered + the entire directory will be deleted. This allows you to overwrite + old partitions completely. """ from pyarrow.fs import _resolve_filesystem_and_path @@ -860,5 +873,5 @@ def file_visitor(written_file): _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions, file_visitor + file_options, max_partitions, file_visitor, existing_data_behavior ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 2527cde62b0..42b8a137a09 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -86,6 +86,14 @@ ctypedef void cb_writer_finish(dict, CFileWriter*) cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: + cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior": + ExistingDataBehavior_DELETE_MATCHING" \ + arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions" + ExistingDataBehavior_OVERWRITE" \ + arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore" + ExistingDataBehavior_ERROR" \ + arrow::dataset::ExistingDataBehavior::kError" + cdef cppclass CScanOptions "arrow::dataset::ScanOptions": @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b4959512f11..750e6b592c4 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3481,6 +3481,54 @@ def test_write_dataset_with_dataset(tempdir): assert dict(load_back_table.to_pydict()) == table.to_pydict() +@pytest.mark.pandas +def test_write_dataset_existing_data(tempdir): + directory = tempdir / 'ds' + table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]}) + partitioning = ds.partitioning(schema=pa.schema( + [pa.field('c', pa.int64())]), flavor='hive') + + def compare_tables_ignoring_order(t1, t2): + df1 = t1.to_pandas().sort_values('b', ignore_index=True) + df2 = t2.to_pandas().sort_values('b', ignore_index=True) + assert df1.equals(df2) + + # First write is ok + ds.write_dataset(table, directory, partitioning=partitioning, format='ipc') + + table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]}) + + # Second write should fail + with pytest.raises(pa.ArrowInvalid): + ds.write_dataset(table, directory, + partitioning=partitioning, format='ipc') + + extra_table = pa.table({'b': ['e']}) + extra_file = directory / 'c=2' / 'foo.arrow' + pyarrow.feather.write_feather(extra_table, extra_file) + + # Should be ok and overwrite with overwrite behaivor + ds.write_dataset(table, directory, partitioning=partitioning, + format='ipc', existing_data_behavior='overwrite') + + overwritten = pa.table( + {'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]}) + readback = ds.dataset(tempdir, format='ipc', + partitioning=partitioning).to_table() + compare_tables_ignoring_order(readback, overwritten) + assert extra_file.exists() + + # Should be ok and delete matching with delete_matching + ds.write_dataset(table, directory, partitioning=partitioning, + format='ipc', existing_data_behavior='delete_matching') + + overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]}) + readback = ds.dataset(tempdir, format='ipc', + partitioning=partitioning).to_table() + compare_tables_ignoring_order(readback, overwritten) + assert not extra_file.exists() + + @pytest.mark.parquet @pytest.mark.pandas def test_write_dataset_partitioned_dict(tempdir): From 419a7e6b3137cc4d4b47f7650d6d700a569c5215 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 5 Nov 2021 14:46:54 -1000 Subject: [PATCH 2/6] ARROW-14620: Missed a line in the original commit --- python/pyarrow/includes/libarrow_dataset.pxd | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 42b8a137a09..763fd21b6d1 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -286,6 +286,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: c_string basename_template function[cb_writer_finish_internal] writer_pre_finish function[cb_writer_finish_internal] writer_post_finish + ExistingDataBehavior existing_data_behavior cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): From 43983688ec40b14bcc1b285e1541825eea4e6e28 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 5 Nov 2021 15:07:26 -1000 Subject: [PATCH 3/6] ARROW-14620: sort_values(ignore_index) is not supported on older versions of pandas. Using reset_index instead --- python/pyarrow/tests/test_dataset.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 750e6b592c4..b758a7d4a49 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3489,8 +3489,8 @@ def test_write_dataset_existing_data(tempdir): [pa.field('c', pa.int64())]), flavor='hive') def compare_tables_ignoring_order(t1, t2): - df1 = t1.to_pandas().sort_values('b', ignore_index=True) - df2 = t2.to_pandas().sort_values('b', ignore_index=True) + df1 = t1.to_pandas().sort_values('b').reset_index(drop=True) + df2 = t2.to_pandas().sort_values('b').reset_index(drop=True) assert df1.equals(df2) # First write is ok From a75eadb987f032b8b3ae460664f6ce393a3bd5c3 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Sun, 7 Nov 2021 09:58:49 -1000 Subject: [PATCH 4/6] ARROW-14620: Renamed overwrite to overwrite_or_ignore and clarified docs --- python/pyarrow/_dataset.pyx | 9 +++++---- python/pyarrow/dataset.py | 10 +++++++--- python/pyarrow/includes/libarrow_dataset.pxd | 2 +- python/pyarrow/tests/test_dataset.py | 3 ++- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 00fdb112035..47271735e35 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3384,14 +3384,15 @@ def _filesystemdataset_write( c_options.basename_template = tobytes(basename_template) if existing_data_behavior == 'error': c_options.existing_data_behavior = ExistingDataBehavior_ERROR - elif existing_data_behavior == 'overwrite': - c_options.existing_data_behavior = ExistingDataBehavior_OVERWRITE + elif existing_data_behavior == 'overwrite_or_ignore': + c_options.existing_data_behavior =\ + ExistingDataBehavior_OVERWRITE_OR_IGNORE elif existing_data_behavior == 'delete_matching': c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING else: raise ValueError( - ('existing_data_behavior must be one of error, overwrite or', - 'delete_matching') + ('existing_data_behavior must be one of error, ', + 'overwrite_or_ignore or delete_matching') ) if file_visitor is not None: diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 33ed68c2c5d..1207e6ef980 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -799,13 +799,17 @@ def write_dataset(data, base_dir, basename_template=None, format=None, def file_visitor(written_file): visited_paths.append(written_file.path) - existing_data_behavior : 'error' | 'overwrite' | 'delete_matching' + existing_data_behavior : 'error' | 'overwrite_or_ignore' | + 'delete_matching' Controls how the dataset will handle data that already exists in the destination. The default behavior (error) is to raise an error if any data exists in the destination. - overwrite will ignore any existing data and will overwrite files - as needed. + overwrite_or_ignore will ignore any existing data and will + overwrite files with the same name as an output file. Other + existing files will be ignored. This behavior, in combination + with a unique basename_template for each write, will allow for + an append workflow. delete_matching is useful when you are writing a partitioned dataset. The first time each partition directory is encountered diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 763fd21b6d1..abc79fea813 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -89,7 +89,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef enum ExistingDataBehavior" arrow::dataset::ExistingDataBehavior": ExistingDataBehavior_DELETE_MATCHING" \ arrow::dataset::ExistingDataBehavior::kDeleteMatchingPartitions" - ExistingDataBehavior_OVERWRITE" \ + ExistingDataBehavior_OVERWRITE_OR_IGNORE" \ arrow::dataset::ExistingDataBehavior::kOverwriteOrIgnore" ExistingDataBehavior_ERROR" \ arrow::dataset::ExistingDataBehavior::kError" diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b758a7d4a49..43cb67125f5 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3509,7 +3509,8 @@ def compare_tables_ignoring_order(t1, t2): # Should be ok and overwrite with overwrite behaivor ds.write_dataset(table, directory, partitioning=partitioning, - format='ipc', existing_data_behavior='overwrite') + format='ipc', + existing_data_behavior='overwrite_or_ignore') overwritten = pa.table( {'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]}) From 5fc42df068047a07df23f8ad044222d369f3184e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 8 Nov 2021 08:34:35 -1000 Subject: [PATCH 5/6] ARROW-14620: Improving error message per PR suggestion --- python/pyarrow/_dataset.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 47271735e35..459c3b8fb76 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3391,8 +3391,8 @@ def _filesystemdataset_write( c_options.existing_data_behavior = ExistingDataBehavior_DELETE_MATCHING else: raise ValueError( - ('existing_data_behavior must be one of error, ', - 'overwrite_or_ignore or delete_matching') + ("existing_data_behavior must be one of 'error', ", + "'overwrite_or_ignore' or 'delete_matching'") ) if file_visitor is not None: From 9f7780b3d6943d06ac07525d15f24f863510d2df Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 8 Nov 2021 09:51:34 -1000 Subject: [PATCH 6/6] Apply suggestions from code review Co-authored-by: Joris Van den Bossche --- python/pyarrow/dataset.py | 10 +++++----- python/pyarrow/tests/test_dataset.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 1207e6ef980..42515a9f4bd 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -799,19 +799,19 @@ def write_dataset(data, base_dir, basename_template=None, format=None, def file_visitor(written_file): visited_paths.append(written_file.path) - existing_data_behavior : 'error' | 'overwrite_or_ignore' | - 'delete_matching' + existing_data_behavior : 'error' | 'overwrite_or_ignore' | \ +'delete_matching' Controls how the dataset will handle data that already exists in - the destination. The default behavior (error) is to raise an error + the destination. The default behavior ('error') is to raise an error if any data exists in the destination. - overwrite_or_ignore will ignore any existing data and will + 'overwrite_or_ignore' will ignore any existing data and will overwrite files with the same name as an output file. Other existing files will be ignored. This behavior, in combination with a unique basename_template for each write, will allow for an append workflow. - delete_matching is useful when you are writing a partitioned + 'delete_matching' is useful when you are writing a partitioned dataset. The first time each partition directory is encountered the entire directory will be deleted. This allows you to overwrite old partitions completely. diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 43cb67125f5..7d539551b65 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3507,7 +3507,7 @@ def compare_tables_ignoring_order(t1, t2): extra_file = directory / 'c=2' / 'foo.arrow' pyarrow.feather.write_feather(extra_table, extra_file) - # Should be ok and overwrite with overwrite behaivor + # Should be ok and overwrite with overwrite behavior ds.write_dataset(table, directory, partitioning=partitioning, format='ipc', existing_data_behavior='overwrite_or_ignore')