From 67c63c7f885696cd3606e56cf3c1fb88613dc65e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 29 Jun 2021 18:28:24 -1000 Subject: [PATCH 1/7] ARROW-12364: Created writer_post_finish (similar to writer_pre_finish) to visit dataset-created files after Finish. Added a similar file_visitor concept to pyarrow which maps to writer_post_finish. Connected the legacy metadata_collector to the file_visitor so that parquet datasets created with use_legacy_dataset=True can support metadata_collector. --- cpp/src/arrow/dataset/file_base.cc | 3 +- cpp/src/arrow/dataset/file_base.h | 6 + python/pyarrow/_dataset.pyx | 57 +++++++++- python/pyarrow/dataset.py | 7 +- python/pyarrow/includes/libarrow_dataset.pxd | 15 +++ python/pyarrow/includes/libarrow_fs.pxd | 4 + python/pyarrow/parquet.py | 8 +- python/pyarrow/tests/test_dataset.py | 111 ++++++++++++++----- 8 files changed, 180 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 3a67ea48378..42ee4ffea0a 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -544,7 +544,8 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio for (const auto& part_queue : state.queues) { task_group->Append([&] { RETURN_NOT_OK(write_options.writer_pre_finish(part_queue.second->writer().get())); - return part_queue.second->writer()->Finish(); + RETURN_NOT_OK(part_queue.second->writer()->Finish()); + return write_options.writer_post_finish(part_queue.second->writer().get()); }); } return task_group->Finish(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index f074e0f81da..4713a540665 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -360,6 +360,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { return Status::OK(); }; + /// Callback to be invoked against all FileWriters after they have + /// called FileWriter::Finish(). + std::function writer_post_finish = [](FileWriter*) { + return Status::OK(); + }; + const std::shared_ptr& format() const { return file_write_options->format(); } diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 562b7a5a3ad..46338dee01f 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3038,7 +3038,7 @@ def _get_partition_keys(Expression partition_expression): For example, an expression of - is converted to {'part': 'a', 'year': 2016} + is converted to {'part': 'A', 'year': 2016} """ cdef: CExpression expr = partition_expression.unwrap() @@ -3053,6 +3053,52 @@ def _get_partition_keys(Expression partition_expression): return out +ctypedef CParquetFileWriter* _CParquetFileWriterPtr + +cdef class WrittenFile(_Weakrefable): + """ + Metadata information about files written as + part of a dataset write operation + """ + + """The full path to the created file""" + cdef public str path + """If the file is a parquet file this will contain the parquet metadata""" + cdef public object metadata + + def __init__(self, path, metadata): + self.path = path + self.metadata = metadata + +cdef void _filesystemdataset_write_visitor( + dict visit_args, + CFileWriter* file_writer): + cdef: + str path + str base_dir + WrittenFile written_file + FileMetaData parquet_metadata + CParquetFileWriter* parquet_file_writer + + if file_writer == nullptr: + return + + parquet_metadata = None + path = frombytes(deref(file_writer).destination().path) + base_dir = frombytes(visit_args['base_dir']) + if deref(deref(file_writer).format()).type_name() == b"parquet": + parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer) + with nogil: + metadata = deref( + deref(parquet_file_writer).parquet_writer()).metadata() + if metadata: + parquet_metadata = FileMetaData() + parquet_metadata.init(metadata) + parquet_metadata.set_file_path(os.path.relpath(path, base_dir)) + written_file = WrittenFile(path, parquet_metadata) + visit_args['file_visitor'](written_file) + + def _filesystemdataset_write( Scanner data not None, object base_dir not None, @@ -3061,6 +3107,7 @@ def _filesystemdataset_write( Partitioning partitioning not None, FileWriteOptions file_options not None, int max_partitions, + object file_visitor ): """ CFileSystemDataset.Write wrapper @@ -3069,6 +3116,8 @@ def _filesystemdataset_write( CFileSystemDatasetWriteOptions c_options shared_ptr[CScanner] c_scanner vector[shared_ptr[CRecordBatch]] c_batches + dict visit_args + function[cb_writer_finish] c_post_finish_cb c_options.file_write_options = file_options.unwrap() c_options.filesystem = filesystem.unwrap() @@ -3076,6 +3125,12 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) + c_post_finish_cb = _filesystemdataset_write_visitor + if file_visitor is not None: + visit_args = {'base_dir': c_options.base_dir, + 'file_visitor': file_visitor} + c_options.writer_post_finish = BindFunction[cb_writer_finish_internal]( + &_filesystemdataset_write_visitor, visit_args) c_scanner = data.unwrap() with nogil: diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b93f492dd38..7d6f887edea 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -690,7 +690,7 @@ def _ensure_write_partitioning(scheme): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, schema=None, filesystem=None, file_options=None, use_threads=True, - use_async=False, max_partitions=None): + use_async=False, max_partitions=None, file_visitor=None): """ Write a dataset to a given format and partitioning. @@ -731,6 +731,9 @@ def write_dataset(data, base_dir, basename_template=None, format=None, (e.g. S3) max_partitions : int, default 1024 Maximum number of partitions any batch may be written into. + file_visitor : Function + If set, this function will be called with a WrittenFile instance + for each file created during the call. """ from pyarrow.fs import _resolve_filesystem_and_path @@ -784,5 +787,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None, _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions + file_options, max_partitions, file_visitor ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 303285905cd..85d4ae2296c 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -81,6 +81,8 @@ cdef extern from "arrow/compute/exec/expression.h" \ CExtractKnownFieldValues "arrow::compute::ExtractKnownFieldValues"( const CExpression& partition_expression) +ctypedef CStatus cb_writer_finish_internal(CFileWriter*) +ctypedef void cb_writer_finish(dict, CFileWriter*) cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @@ -223,6 +225,17 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: const shared_ptr[CFileFormat]& format() const c_string type_name() const + cdef cppclass CFileWriter \ + "arrow::dataset::FileWriter": + const shared_ptr[CFileFormat]& format() const + const shared_ptr[CSchema]& schema() const + const shared_ptr[CFileWriteOptions]& options() const + const CFileLocator& destination() const + + cdef cppclass CParquetFileWriter \ + "arrow::dataset::ParquetFileWriter"(CFileWriter): + const shared_ptr[FileWriter]& parquet_writer() const + cdef cppclass CFileFormat "arrow::dataset::FileFormat": shared_ptr[CFragmentScanOptions] default_fragment_scan_options c_string type_name() const @@ -263,6 +276,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CPartitioning] partitioning int max_partitions c_string basename_template + function[cb_writer_finish_internal] writer_pre_finish + function[cb_writer_finish_internal] writer_post_finish cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index 52ef97e5757..eef3757bff0 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -52,6 +52,10 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: c_bool allow_not_found c_bool recursive + cdef cppclass CFileLocator "arrow::fs::FileLocator": + shared_ptr[CFileSystem] filesystem + c_string path + cdef cppclass CFileSystem "arrow::fs::FileSystem": shared_ptr[CFileSystem] shared_from_this() c_string type_name() const diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 22763680cd1..ec303cb5123 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1958,8 +1958,11 @@ def write_to_dataset(table, root_path, partition_cols=None, "implementation." ) metadata_collector = kwargs.pop('metadata_collector', None) + file_visitor = None if metadata_collector is not None: - raise ValueError(msg.format("metadata_collector")) + def file_visitor(written_file): + if written_file.metadata: + metadata_collector.append(written_file.metadata) if partition_filename_cb is not None: raise ValueError(msg.format("partition_filename_cb")) @@ -1979,7 +1982,8 @@ def write_to_dataset(table, root_path, partition_cols=None, ds.write_dataset( table, root_path, filesystem=filesystem, format=parquet_format, file_options=write_options, schema=schema, - partitioning=partitioning, use_threads=use_threads) + partitioning=partitioning, use_threads=use_threads, + file_visitor=file_visitor) return fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 3c79d1281cd..d1b1bcffae1 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2706,33 +2706,40 @@ def test_feather_format(tempdir, dataset_reader): dataset_reader.to_table(ds.dataset(basedir, format="feather")) -def _create_parquet_dataset_simple(root_path): +def _create_parquet_dataset_simple(root_path, use_legacy_dataset): import pyarrow.parquet as pq metadata_collector = [] - for i in range(4): - table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)}) - pq.write_to_dataset( - table, str(root_path), metadata_collector=metadata_collector - ) + f1_vals = [item for chunk in range(4) for item in [chunk] * 10] + + table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)}) + pq.write_to_dataset( + table, str(root_path), partition_cols=['f1'], + use_legacy_dataset=use_legacy_dataset, + metadata_collector=metadata_collector + ) + + partitionless_schema = pa.schema([pa.field('f2', pa.float64())]) metadata_path = str(root_path / '_metadata') # write _metadata file pq.write_metadata( - table.schema, metadata_path, + partitionless_schema, metadata_path, metadata_collector=metadata_collector ) - return metadata_path, table + return metadata_path, partitionless_schema @pytest.mark.parquet @pytest.mark.pandas # write_to_dataset currently requires pandas -def test_parquet_dataset_factory(tempdir): +@pytest.mark.parametrize('use_legacy_dataset', [False, True]) +def test_parquet_dataset_factory(tempdir, use_legacy_dataset): root_path = tempdir / "test_parquet_dataset" - metadata_path, table = _create_parquet_dataset_simple(root_path) + metadata_path, partitionless_schema = _create_parquet_dataset_simple( + root_path, use_legacy_dataset) dataset = ds.parquet_dataset(metadata_path) - assert dataset.schema.equals(table.schema) + assert dataset.schema.equals(partitionless_schema) assert len(dataset.files) == 4 result = dataset.to_table() assert result.num_rows == 40 @@ -2740,13 +2747,15 @@ def test_parquet_dataset_factory(tempdir): @pytest.mark.parquet @pytest.mark.pandas -def test_parquet_dataset_factory_invalid(tempdir): +@pytest.mark.parametrize('use_legacy_dataset', [False, True]) +def test_parquet_dataset_factory_invalid(tempdir, use_legacy_dataset): root_path = tempdir / "test_parquet_dataset_invalid" - metadata_path, table = _create_parquet_dataset_simple(root_path) + metadata_path, partitionless_schema = _create_parquet_dataset_simple( + root_path, use_legacy_dataset) # remove one of the files - list(root_path.glob("*.parquet"))[0].unlink() + list(root_path.glob("f1=1/*.parquet"))[0].unlink() dataset = ds.parquet_dataset(metadata_path) - assert dataset.schema.equals(table.schema) + assert dataset.schema.equals(partitionless_schema) assert len(dataset.files) == 4 with pytest.raises(FileNotFoundError): dataset.to_table() @@ -2829,7 +2838,7 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs): # created with ParquetDatasetFactory from a _metadata file root_path = tempdir / "test_parquet_dataset_lazy_filtering" - metadata_path, _ = _create_parquet_dataset_simple(root_path) + metadata_path, _ = _create_parquet_dataset_simple(root_path, True) # creating the dataset should only open the metadata file with assert_opens([metadata_path]): @@ -2844,11 +2853,11 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs): # filtering fragments should not open any file with assert_opens([]): - list(dataset.get_fragments(ds.field("f1") > 15)) + list(dataset.get_fragments(ds.field("f2") > 15)) # splitting by row group should still not open any file with assert_opens([]): - fragments[0].split_by_row_group(ds.field("f1") > 15) + fragments[0].split_by_row_group(ds.field("f2") > 15) # ensuring metadata of splitted fragment should also not open any file with assert_opens([]): @@ -3129,10 +3138,24 @@ def test_write_dataset_use_threads(tempdir): pa.schema([("part", pa.string())]), flavor="hive") target1 = tempdir / 'partitioned1' + paths_written = [] + + def file_visitor(written_file): + paths_written.append(written_file.path) + ds.write_dataset( dataset, target1, format="feather", partitioning=partitioning, - use_threads=True + use_threads=True, file_visitor=file_visitor ) + expected_paths = [ + target1 / 'part=a' / 'part-0.feather', + target1 / 'part=a' / 'part-1.feather', + target1 / 'part=b' / 'part-0.feather', + target1 / 'part=b' / 'part-1.feather' + ] + for path in paths_written: + assert pathlib.Path(path) in expected_paths + target2 = tempdir / 'partitioned2' ds.write_dataset( dataset, target2, format="feather", partitioning=partitioning, @@ -3164,19 +3187,29 @@ def test_write_table(tempdir): # with partitioning base_dir = tempdir / 'partitioned' + expected_paths = [ + base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow", + base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow" + ] + + visited_paths = [] + + def file_visitor(written_file): + nonlocal visited_paths + visited_paths.append(written_file.path) + partitioning = ds.partitioning( pa.schema([("part", pa.string())]), flavor="hive") ds.write_dataset(table, base_dir, format="feather", basename_template='dat_{i}.arrow', - partitioning=partitioning) + partitioning=partitioning, file_visitor=file_visitor) file_paths = list(base_dir.rglob("*")) - expected_paths = [ - base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow", - base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow" - ] assert set(file_paths) == set(expected_paths) result = ds.dataset(base_dir, format="ipc", partitioning=partitioning) assert result.to_table().equals(table) + assert len(visited_paths) == 2 + for visited_path in visited_paths: + assert pathlib.Path(visited_path) in expected_paths def test_write_table_multiple_fragments(tempdir): @@ -3303,8 +3336,19 @@ def test_write_dataset_parquet(tempdir): # using default "parquet" format string + files_correct_metadata = 0 + + def file_visitor(written_file): + nonlocal files_correct_metadata + if (written_file.metadata is not None and + written_file.metadata.num_columns == 3): + files_correct_metadata += 1 + base_dir = tempdir / 'parquet_dataset' - ds.write_dataset(table, base_dir, format="parquet") + ds.write_dataset(table, base_dir, format="parquet", + file_visitor=file_visitor) + + assert files_correct_metadata == 1 # check that all files are present file_paths = list(base_dir.rglob("*")) expected_paths = [base_dir / "part-0.parquet"] @@ -3457,3 +3501,20 @@ def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader): ) table = dataset_reader.to_table(fsds) assert table.schema == schema + + +def test_visit_strings_adhoc(): + import pyarrow._dataset as _ds + + strings = ['a', 'b', 'c'] + visited = [] + _ds._visit_strings(strings, visited.append) + + assert visited == strings + + with pytest.raises(ValueError, match="wtf"): + def raise_on_b(s): + if s == 'b': + raise ValueError('wtf') + + _ds._visit_strings(strings, raise_on_b) From a393dd20178ea526138def8e7924fd18ea1417a8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 2 Jul 2021 12:26:40 -1000 Subject: [PATCH 2/7] ARROW-12364: Addressing PR comments --- python/pyarrow/_dataset.pyx | 7 +----- python/pyarrow/tests/test_dataset.py | 35 ++++++++++------------------ 2 files changed, 13 insertions(+), 29 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 46338dee01f..454d45b53a7 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3080,18 +3080,15 @@ cdef void _filesystemdataset_write_visitor( FileMetaData parquet_metadata CParquetFileWriter* parquet_file_writer - if file_writer == nullptr: - return - parquet_metadata = None path = frombytes(deref(file_writer).destination().path) - base_dir = frombytes(visit_args['base_dir']) if deref(deref(file_writer).format()).type_name() == b"parquet": parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer) with nogil: metadata = deref( deref(parquet_file_writer).parquet_writer()).metadata() if metadata: + base_dir = frombytes(visit_args['base_dir']) parquet_metadata = FileMetaData() parquet_metadata.init(metadata) parquet_metadata.set_file_path(os.path.relpath(path, base_dir)) @@ -3117,7 +3114,6 @@ def _filesystemdataset_write( shared_ptr[CScanner] c_scanner vector[shared_ptr[CRecordBatch]] c_batches dict visit_args - function[cb_writer_finish] c_post_finish_cb c_options.file_write_options = file_options.unwrap() c_options.filesystem = filesystem.unwrap() @@ -3125,7 +3121,6 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) - c_post_finish_cb = _filesystemdataset_write_visitor if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, 'file_visitor': file_visitor} diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index d1b1bcffae1..b9f2e13b462 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3141,20 +3141,26 @@ def test_write_dataset_use_threads(tempdir): paths_written = [] def file_visitor(written_file): + print(f'Visiting {written_file.path}') paths_written.append(written_file.path) ds.write_dataset( dataset, target1, format="feather", partitioning=partitioning, use_threads=True, file_visitor=file_visitor ) - expected_paths = [ + + # Since it is a multi-threaded write there is no way to know which + # directory gets part-0 and which gets part-1 + expected_paths_a = { target1 / 'part=a' / 'part-0.feather', - target1 / 'part=a' / 'part-1.feather', - target1 / 'part=b' / 'part-0.feather', target1 / 'part=b' / 'part-1.feather' - ] - for path in paths_written: - assert pathlib.Path(path) in expected_paths + } + expected_paths_b = { + target1 / 'part=a' / 'part-1.feather', + target1 / 'part=b' / 'part-0.feather' + } + paths_written_set = set(map(pathlib.Path, paths_written)) + assert paths_written_set in [expected_paths_a, expected_paths_b] target2 = tempdir / 'partitioned2' ds.write_dataset( @@ -3501,20 +3507,3 @@ def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader): ) table = dataset_reader.to_table(fsds) assert table.schema == schema - - -def test_visit_strings_adhoc(): - import pyarrow._dataset as _ds - - strings = ['a', 'b', 'c'] - visited = [] - _ds._visit_strings(strings, visited.append) - - assert visited == strings - - with pytest.raises(ValueError, match="wtf"): - def raise_on_b(s): - if s == 'b': - raise ValueError('wtf') - - _ds._visit_strings(strings, raise_on_b) From 1adff82f3d85765152e6905c58852b5741822342 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 2 Jul 2021 12:28:01 -1000 Subject: [PATCH 3/7] ARROW-12364: Added comment about why we use post_finish --- python/pyarrow/_dataset.pyx | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 454d45b53a7..46fc935468d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3124,6 +3124,8 @@ def _filesystemdataset_write( if file_visitor is not None: visit_args = {'base_dir': c_options.base_dir, 'file_visitor': file_visitor} + # Need to use post_finish because parquet metadata is not available + # until after Finish has been called c_options.writer_post_finish = BindFunction[cb_writer_finish_internal]( &_filesystemdataset_write_visitor, visit_args) From b1af63b0e8128462df96a41c5a418701ecbb201f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Jul 2021 00:30:17 -1000 Subject: [PATCH 4/7] ARROW-12364: Addressed PR comments --- python/pyarrow/_dataset.pyx | 6 +++- python/pyarrow/dataset.py | 5 +++- python/pyarrow/parquet.py | 3 +- python/pyarrow/tests/test_dataset.py | 45 +++++++++++++++++----------- 4 files changed, 38 insertions(+), 21 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 46fc935468d..2a8eaa04c29 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3063,7 +3063,11 @@ cdef class WrittenFile(_Weakrefable): """The full path to the created file""" cdef public str path - """If the file is a parquet file this will contain the parquet metadata""" + """ + If the file is a parquet file this will contain the parquet metadata. + This metadata will have the file path attribute set to the path of + the written file. + """ cdef public object metadata def __init__(self, path, metadata): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 7d6f887edea..324201a9787 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -733,7 +733,10 @@ def write_dataset(data, base_dir, basename_template=None, format=None, Maximum number of partitions any batch may be written into. file_visitor : Function If set, this function will be called with a WrittenFile instance - for each file created during the call. + for each file created during the call. This object will contain + the path and (if the dataset is a parquet dataset) the parquet + metadata. This can be used to collect the paths or metadata of + all written files. """ from pyarrow.fs import _resolve_filesystem_and_path diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index ec303cb5123..c578661851f 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1961,8 +1961,7 @@ def write_to_dataset(table, root_path, partition_cols=None, file_visitor = None if metadata_collector is not None: def file_visitor(written_file): - if written_file.metadata: - metadata_collector.append(written_file.metadata) + metadata_collector.append(written_file.metadata) if partition_filename_cb is not None: raise ValueError(msg.format("partition_filename_cb")) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b9f2e13b462..c337b7631f3 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2706,21 +2706,22 @@ def test_feather_format(tempdir, dataset_reader): dataset_reader.to_table(ds.dataset(basedir, format="feather")) -def _create_parquet_dataset_simple(root_path, use_legacy_dataset): +def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True): import pyarrow.parquet as pq metadata_collector = [] f1_vals = [item for chunk in range(4) for item in [chunk] * 10] + f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10] - table = pa.table({'f1': f1_vals, 'f2': np.random.randn(40)}) + table = pa.table({'f1': f1_vals, 'f2': f2_vals}) pq.write_to_dataset( table, str(root_path), partition_cols=['f1'], use_legacy_dataset=use_legacy_dataset, metadata_collector=metadata_collector ) - partitionless_schema = pa.schema([pa.field('f2', pa.float64())]) + partitionless_schema = pa.schema([pa.field('f2', pa.int64())]) metadata_path = str(root_path / '_metadata') # write _metadata file @@ -2838,7 +2839,7 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs): # created with ParquetDatasetFactory from a _metadata file root_path = tempdir / "test_parquet_dataset_lazy_filtering" - metadata_path, _ = _create_parquet_dataset_simple(root_path, True) + metadata_path, _ = _create_parquet_dataset_simple(root_path) # creating the dataset should only open the metadata file with assert_opens([metadata_path]): @@ -3141,7 +3142,6 @@ def test_write_dataset_use_threads(tempdir): paths_written = [] def file_visitor(written_file): - print(f'Visiting {written_file.path}') paths_written.append(written_file.path) ds.write_dataset( @@ -3201,7 +3201,6 @@ def test_write_table(tempdir): visited_paths = [] def file_visitor(written_file): - nonlocal visited_paths visited_paths.append(written_file.path) partitioning = ds.partitioning( @@ -3342,19 +3341,9 @@ def test_write_dataset_parquet(tempdir): # using default "parquet" format string - files_correct_metadata = 0 - - def file_visitor(written_file): - nonlocal files_correct_metadata - if (written_file.metadata is not None and - written_file.metadata.num_columns == 3): - files_correct_metadata += 1 - base_dir = tempdir / 'parquet_dataset' - ds.write_dataset(table, base_dir, format="parquet", - file_visitor=file_visitor) + ds.write_dataset(table, base_dir, format="parquet") - assert files_correct_metadata == 1 # check that all files are present file_paths = list(base_dir.rglob("*")) expected_paths = [base_dir / "part-0.parquet"] @@ -3399,6 +3388,28 @@ def test_write_dataset_csv(tempdir): assert result.equals(table) +@pytest.mark.parquet +def test_write_dataset_parquet_file_visitor(tempdir): + table = pa.table([ + pa.array(range(20)), pa.array(np.random.randn(20)), + pa.array(np.repeat(['a', 'b'], 10)) + ], names=["f1", "f2", "part"]) + + visitor_called = False + + def file_visitor(written_file): + nonlocal visitor_called + if (written_file.metadata is not None and + written_file.metadata.num_columns == 3): + visitor_called = True + + base_dir = tempdir / 'parquet_dataset' + ds.write_dataset(table, base_dir, format="parquet", + file_visitor=file_visitor) + + assert visitor_called + + @pytest.mark.parquet @pytest.mark.pandas def test_write_dataset_arrow_schema_metadata(tempdir): From 63ebb03cd0dcac95a7950aa42f8985d7df1ffce0 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 8 Jul 2021 14:48:25 -1000 Subject: [PATCH 5/7] ARROW-12364: Addressing PR comments --- python/pyarrow/dataset.py | 20 ++++- python/pyarrow/tests/test_dataset.py | 119 ++++++++++++++++++++------- 2 files changed, 107 insertions(+), 32 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 324201a9787..9c601e30cf9 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -733,10 +733,22 @@ def write_dataset(data, base_dir, basename_template=None, format=None, Maximum number of partitions any batch may be written into. file_visitor : Function If set, this function will be called with a WrittenFile instance - for each file created during the call. This object will contain - the path and (if the dataset is a parquet dataset) the parquet - metadata. This can be used to collect the paths or metadata of - all written files. + for each file created during the call. This object will have both + a path attribute and a metadata attribute. + + The path attribute will a str containing the absolute path to + the created file. + + The metadata attribute will be the parquet metadata of the file. + This metadata will have the file path attribute set and can be used + to build a _metadata file. The metadata attribute will be None if + the format is not parquet. + + # Example visitor which simple collects the filenames created + visited_paths = [] + + def file_visitor(written_file): + visited_paths.append(written_file.path) """ from pyarrow.fs import _resolve_filesystem_and_path diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index c337b7631f3..b9064f7e828 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2706,57 +2706,80 @@ def test_feather_format(tempdir, dataset_reader): dataset_reader.to_table(ds.dataset(basedir, format="feather")) -def _create_parquet_dataset_simple(root_path, use_legacy_dataset=True): +def _create_parquet_dataset_simple(root_path): + """ + Creates a simple (flat files, no nested partitioning) Parquet dataset + """ import pyarrow.parquet as pq metadata_collector = [] - f1_vals = [item for chunk in range(4) for item in [chunk] * 10] - f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10] - - table = pa.table({'f1': f1_vals, 'f2': f2_vals}) - pq.write_to_dataset( - table, str(root_path), partition_cols=['f1'], - use_legacy_dataset=use_legacy_dataset, - metadata_collector=metadata_collector - ) - - partitionless_schema = pa.schema([pa.field('f2', pa.int64())]) + for i in range(4): + table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)}) + pq.write_to_dataset( + table, str(root_path), metadata_collector=metadata_collector + ) metadata_path = str(root_path / '_metadata') # write _metadata file pq.write_metadata( - partitionless_schema, metadata_path, + table.schema, metadata_path, metadata_collector=metadata_collector ) - return metadata_path, partitionless_schema + return metadata_path, table @pytest.mark.parquet @pytest.mark.pandas # write_to_dataset currently requires pandas -@pytest.mark.parametrize('use_legacy_dataset', [False, True]) -def test_parquet_dataset_factory(tempdir, use_legacy_dataset): +def test_parquet_dataset_factory(tempdir): root_path = tempdir / "test_parquet_dataset" - metadata_path, partitionless_schema = _create_parquet_dataset_simple( - root_path, use_legacy_dataset) + metadata_path, table = _create_parquet_dataset_simple(root_path) dataset = ds.parquet_dataset(metadata_path) - assert dataset.schema.equals(partitionless_schema) + assert dataset.schema.equals(table.schema) assert len(dataset.files) == 4 result = dataset.to_table() assert result.num_rows == 40 @pytest.mark.parquet -@pytest.mark.pandas +@pytest.mark.pandas # write_to_dataset currently requires pandas @pytest.mark.parametrize('use_legacy_dataset', [False, True]) -def test_parquet_dataset_factory_invalid(tempdir, use_legacy_dataset): +def test_parquet_dataset_factory_roundtrip(tempdir, use_legacy_dataset): + # Simple test to ensure we can roundtrip dataset to + # _metadata/common_metadata and back. A more complex test + # using partitioning will have to wait for ARROW-13269. The + # above test (test_parquet_dataset_factory) will not work + # when legacy is False as there is no "append" equivalent in + # the new dataset until ARROW-12358 + import pyarrow.parquet as pq + root_path = tempdir / "test_parquet_dataset" + table = pa.table({'f1': [0] * 10, 'f2': np.random.randn(10)}) + metadata_collector = [] + pq.write_to_dataset( + table, str(root_path), metadata_collector=metadata_collector, + use_legacy_dataset=use_legacy_dataset + ) + metadata_path = str(root_path / '_metadata') + # write _metadata file + pq.write_metadata( + table.schema, metadata_path, + metadata_collector=metadata_collector + ) + dataset = ds.parquet_dataset(metadata_path) + assert dataset.schema.equals(table.schema) + result = dataset.to_table() + assert result.num_rows == 10 + + +@pytest.mark.parquet +@pytest.mark.pandas +def test_parquet_dataset_factory_invalid(tempdir): root_path = tempdir / "test_parquet_dataset_invalid" - metadata_path, partitionless_schema = _create_parquet_dataset_simple( - root_path, use_legacy_dataset) + metadata_path, table = _create_parquet_dataset_simple(root_path) # remove one of the files - list(root_path.glob("f1=1/*.parquet"))[0].unlink() + list(root_path.glob("*.parquet"))[0].unlink() dataset = ds.parquet_dataset(metadata_path) - assert dataset.schema.equals(partitionless_schema) + assert dataset.schema.equals(table.schema) assert len(dataset.files) == 4 with pytest.raises(FileNotFoundError): dataset.to_table() @@ -2854,11 +2877,11 @@ def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs): # filtering fragments should not open any file with assert_opens([]): - list(dataset.get_fragments(ds.field("f2") > 15)) + list(dataset.get_fragments(ds.field("f1") > 15)) # splitting by row group should still not open any file with assert_opens([]): - fragments[0].split_by_row_group(ds.field("f2") > 15) + fragments[0].split_by_row_group(ds.field("f1") > 15) # ensuring metadata of splitted fragment should also not open any file with assert_opens([]): @@ -3343,7 +3366,6 @@ def test_write_dataset_parquet(tempdir): base_dir = tempdir / 'parquet_dataset' ds.write_dataset(table, base_dir, format="parquet") - # check that all files are present file_paths = list(base_dir.rglob("*")) expected_paths = [base_dir / "part-0.parquet"] @@ -3410,6 +3432,47 @@ def file_visitor(written_file): assert visitor_called +def test_partition_dataset_parquet_file_visitor(tempdir): + f1_vals = [item for chunk in range(4) for item in [chunk] * 10] + f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10] + table = pa.table({'f1': f1_vals, 'f2': f2_vals, + 'part': np.repeat(['a', 'b'], 20)}) + + root_path = tempdir / 'partitioned' + partitioning = ds.partitioning( + pa.schema([("part", pa.string())]), flavor="hive") + + paths_written = [] + + sample_metadata = None + + def file_visitor(written_file): + nonlocal sample_metadata + if written_file.metadata: + sample_metadata = written_file.metadata + paths_written.append(written_file.path) + + ds.write_dataset( + table, root_path, format="parquet", partitioning=partitioning, + use_threads=True, file_visitor=file_visitor + ) + + # Since it is a multi-threaded write there is no way to know which + # directory gets part-0 and which gets part-1 + expected_paths_a = { + root_path / 'part=a' / 'part-0.parquet', + root_path / 'part=b' / 'part-1.parquet' + } + expected_paths_b = { + root_path / 'part=a' / 'part-1.parquet', + root_path / 'part=b' / 'part-0.parquet' + } + paths_written_set = set(map(pathlib.Path, paths_written)) + assert paths_written_set in [expected_paths_a, expected_paths_b] + assert sample_metadata is not None + assert sample_metadata.num_columns == 2 + + @pytest.mark.parquet @pytest.mark.pandas def test_write_dataset_arrow_schema_metadata(tempdir): From 68a6d39ca77ddc4123f40c7797c4fc4536baee41 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 9 Jul 2021 09:47:06 -1000 Subject: [PATCH 6/7] Update python/pyarrow/dataset.py Co-authored-by: Joris Van den Bossche --- python/pyarrow/dataset.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 9c601e30cf9..c612c777795 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -744,11 +744,12 @@ def write_dataset(data, base_dir, basename_template=None, format=None, to build a _metadata file. The metadata attribute will be None if the format is not parquet. - # Example visitor which simple collects the filenames created - visited_paths = [] + Example visitor which simple collects the filenames created:: - def file_visitor(written_file): - visited_paths.append(written_file.path) + visited_paths = [] + + def file_visitor(written_file): + visited_paths.append(written_file.path) """ from pyarrow.fs import _resolve_filesystem_and_path From 5c49002daef1301898c383188a47c82ead86272f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 9 Jul 2021 09:47:15 -1000 Subject: [PATCH 7/7] Update python/pyarrow/dataset.py Co-authored-by: Joris Van den Bossche --- python/pyarrow/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index c612c777795..8b5799e6da2 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -736,7 +736,7 @@ def write_dataset(data, base_dir, basename_template=None, format=None, for each file created during the call. This object will have both a path attribute and a metadata attribute. - The path attribute will a str containing the absolute path to + The path attribute will be a string containing the path to the created file. The metadata attribute will be the parquet metadata of the file.