diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 3a67ea48378..42ee4ffea0a 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -544,7 +544,8 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio for (const auto& part_queue : state.queues) { task_group->Append([&] { RETURN_NOT_OK(write_options.writer_pre_finish(part_queue.second->writer().get())); - return part_queue.second->writer()->Finish(); + RETURN_NOT_OK(part_queue.second->writer()->Finish()); + return write_options.writer_post_finish(part_queue.second->writer().get()); }); } return task_group->Finish(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index f074e0f81da..4713a540665 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -360,6 +360,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions { return Status::OK(); }; + /// Callback to be invoked against all FileWriters after they have + /// called FileWriter::Finish(). + std::function writer_post_finish = [](FileWriter*) { + return Status::OK(); + }; + const std::shared_ptr& format() const { return file_write_options->format(); } diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 562b7a5a3ad..2a8eaa04c29 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -3038,7 +3038,7 @@ def _get_partition_keys(Expression partition_expression): For example, an expression of - is converted to {'part': 'a', 'year': 2016} + is converted to {'part': 'A', 'year': 2016} """ cdef: CExpression expr = partition_expression.unwrap() @@ -3053,6 +3053,53 @@ def _get_partition_keys(Expression partition_expression): return out +ctypedef CParquetFileWriter* _CParquetFileWriterPtr + +cdef class WrittenFile(_Weakrefable): + """ + Metadata information about files written as + part of a dataset write operation + """ + + """The full path to the created file""" + cdef public str path + """ + If the file is a parquet file this will contain the parquet metadata. + This metadata will have the file path attribute set to the path of + the written file. + """ + cdef public object metadata + + def __init__(self, path, metadata): + self.path = path + self.metadata = metadata + +cdef void _filesystemdataset_write_visitor( + dict visit_args, + CFileWriter* file_writer): + cdef: + str path + str base_dir + WrittenFile written_file + FileMetaData parquet_metadata + CParquetFileWriter* parquet_file_writer + + parquet_metadata = None + path = frombytes(deref(file_writer).destination().path) + if deref(deref(file_writer).format()).type_name() == b"parquet": + parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer) + with nogil: + metadata = deref( + deref(parquet_file_writer).parquet_writer()).metadata() + if metadata: + base_dir = frombytes(visit_args['base_dir']) + parquet_metadata = FileMetaData() + parquet_metadata.init(metadata) + parquet_metadata.set_file_path(os.path.relpath(path, base_dir)) + written_file = WrittenFile(path, parquet_metadata) + visit_args['file_visitor'](written_file) + + def _filesystemdataset_write( Scanner data not None, object base_dir not None, @@ -3061,6 +3108,7 @@ def _filesystemdataset_write( Partitioning partitioning not None, FileWriteOptions file_options not None, int max_partitions, + object file_visitor ): """ CFileSystemDataset.Write wrapper @@ -3069,6 +3117,7 @@ def _filesystemdataset_write( CFileSystemDatasetWriteOptions c_options shared_ptr[CScanner] c_scanner vector[shared_ptr[CRecordBatch]] c_batches + dict visit_args c_options.file_write_options = file_options.unwrap() c_options.filesystem = filesystem.unwrap() @@ -3076,6 +3125,13 @@ def _filesystemdataset_write( c_options.partitioning = partitioning.unwrap() c_options.max_partitions = max_partitions c_options.basename_template = tobytes(basename_template) + if file_visitor is not None: + visit_args = {'base_dir': c_options.base_dir, + 'file_visitor': file_visitor} + # Need to use post_finish because parquet metadata is not available + # until after Finish has been called + c_options.writer_post_finish = BindFunction[cb_writer_finish_internal]( + &_filesystemdataset_write_visitor, visit_args) c_scanner = data.unwrap() with nogil: diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b93f492dd38..8b5799e6da2 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -690,7 +690,7 @@ def _ensure_write_partitioning(scheme): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, schema=None, filesystem=None, file_options=None, use_threads=True, - use_async=False, max_partitions=None): + use_async=False, max_partitions=None, file_visitor=None): """ Write a dataset to a given format and partitioning. @@ -731,6 +731,25 @@ def write_dataset(data, base_dir, basename_template=None, format=None, (e.g. S3) max_partitions : int, default 1024 Maximum number of partitions any batch may be written into. + file_visitor : Function + If set, this function will be called with a WrittenFile instance + for each file created during the call. This object will have both + a path attribute and a metadata attribute. + + The path attribute will be a string containing the path to + the created file. + + The metadata attribute will be the parquet metadata of the file. + This metadata will have the file path attribute set and can be used + to build a _metadata file. The metadata attribute will be None if + the format is not parquet. + + Example visitor which simple collects the filenames created:: + + visited_paths = [] + + def file_visitor(written_file): + visited_paths.append(written_file.path) """ from pyarrow.fs import _resolve_filesystem_and_path @@ -784,5 +803,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None, _filesystemdataset_write( scanner, base_dir, basename_template, filesystem, partitioning, - file_options, max_partitions + file_options, max_partitions, file_visitor ) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 303285905cd..85d4ae2296c 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -81,6 +81,8 @@ cdef extern from "arrow/compute/exec/expression.h" \ CExtractKnownFieldValues "arrow::compute::ExtractKnownFieldValues"( const CExpression& partition_expression) +ctypedef CStatus cb_writer_finish_internal(CFileWriter*) +ctypedef void cb_writer_finish(dict, CFileWriter*) cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @@ -223,6 +225,17 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: const shared_ptr[CFileFormat]& format() const c_string type_name() const + cdef cppclass CFileWriter \ + "arrow::dataset::FileWriter": + const shared_ptr[CFileFormat]& format() const + const shared_ptr[CSchema]& schema() const + const shared_ptr[CFileWriteOptions]& options() const + const CFileLocator& destination() const + + cdef cppclass CParquetFileWriter \ + "arrow::dataset::ParquetFileWriter"(CFileWriter): + const shared_ptr[FileWriter]& parquet_writer() const + cdef cppclass CFileFormat "arrow::dataset::FileFormat": shared_ptr[CFragmentScanOptions] default_fragment_scan_options c_string type_name() const @@ -263,6 +276,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CPartitioning] partitioning int max_partitions c_string basename_template + function[cb_writer_finish_internal] writer_pre_finish + function[cb_writer_finish_internal] writer_post_finish cdef cppclass CFileSystemDataset \ "arrow::dataset::FileSystemDataset"(CDataset): diff --git a/python/pyarrow/includes/libarrow_fs.pxd b/python/pyarrow/includes/libarrow_fs.pxd index 52ef97e5757..eef3757bff0 100644 --- a/python/pyarrow/includes/libarrow_fs.pxd +++ b/python/pyarrow/includes/libarrow_fs.pxd @@ -52,6 +52,10 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil: c_bool allow_not_found c_bool recursive + cdef cppclass CFileLocator "arrow::fs::FileLocator": + shared_ptr[CFileSystem] filesystem + c_string path + cdef cppclass CFileSystem "arrow::fs::FileSystem": shared_ptr[CFileSystem] shared_from_this() c_string type_name() const diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 22763680cd1..c578661851f 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -1958,8 +1958,10 @@ def write_to_dataset(table, root_path, partition_cols=None, "implementation." ) metadata_collector = kwargs.pop('metadata_collector', None) + file_visitor = None if metadata_collector is not None: - raise ValueError(msg.format("metadata_collector")) + def file_visitor(written_file): + metadata_collector.append(written_file.metadata) if partition_filename_cb is not None: raise ValueError(msg.format("partition_filename_cb")) @@ -1979,7 +1981,8 @@ def write_to_dataset(table, root_path, partition_cols=None, ds.write_dataset( table, root_path, filesystem=filesystem, format=parquet_format, file_options=write_options, schema=schema, - partitioning=partitioning, use_threads=use_threads) + partitioning=partitioning, use_threads=use_threads, + file_visitor=file_visitor) return fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 3c79d1281cd..b9064f7e828 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -2707,6 +2707,9 @@ def test_feather_format(tempdir, dataset_reader): def _create_parquet_dataset_simple(root_path): + """ + Creates a simple (flat files, no nested partitioning) Parquet dataset + """ import pyarrow.parquet as pq metadata_collector = [] @@ -2738,6 +2741,36 @@ def test_parquet_dataset_factory(tempdir): assert result.num_rows == 40 +@pytest.mark.parquet +@pytest.mark.pandas # write_to_dataset currently requires pandas +@pytest.mark.parametrize('use_legacy_dataset', [False, True]) +def test_parquet_dataset_factory_roundtrip(tempdir, use_legacy_dataset): + # Simple test to ensure we can roundtrip dataset to + # _metadata/common_metadata and back. A more complex test + # using partitioning will have to wait for ARROW-13269. The + # above test (test_parquet_dataset_factory) will not work + # when legacy is False as there is no "append" equivalent in + # the new dataset until ARROW-12358 + import pyarrow.parquet as pq + root_path = tempdir / "test_parquet_dataset" + table = pa.table({'f1': [0] * 10, 'f2': np.random.randn(10)}) + metadata_collector = [] + pq.write_to_dataset( + table, str(root_path), metadata_collector=metadata_collector, + use_legacy_dataset=use_legacy_dataset + ) + metadata_path = str(root_path / '_metadata') + # write _metadata file + pq.write_metadata( + table.schema, metadata_path, + metadata_collector=metadata_collector + ) + dataset = ds.parquet_dataset(metadata_path) + assert dataset.schema.equals(table.schema) + result = dataset.to_table() + assert result.num_rows == 10 + + @pytest.mark.parquet @pytest.mark.pandas def test_parquet_dataset_factory_invalid(tempdir): @@ -3129,10 +3162,29 @@ def test_write_dataset_use_threads(tempdir): pa.schema([("part", pa.string())]), flavor="hive") target1 = tempdir / 'partitioned1' + paths_written = [] + + def file_visitor(written_file): + paths_written.append(written_file.path) + ds.write_dataset( dataset, target1, format="feather", partitioning=partitioning, - use_threads=True + use_threads=True, file_visitor=file_visitor ) + + # Since it is a multi-threaded write there is no way to know which + # directory gets part-0 and which gets part-1 + expected_paths_a = { + target1 / 'part=a' / 'part-0.feather', + target1 / 'part=b' / 'part-1.feather' + } + expected_paths_b = { + target1 / 'part=a' / 'part-1.feather', + target1 / 'part=b' / 'part-0.feather' + } + paths_written_set = set(map(pathlib.Path, paths_written)) + assert paths_written_set in [expected_paths_a, expected_paths_b] + target2 = tempdir / 'partitioned2' ds.write_dataset( dataset, target2, format="feather", partitioning=partitioning, @@ -3164,19 +3216,28 @@ def test_write_table(tempdir): # with partitioning base_dir = tempdir / 'partitioned' + expected_paths = [ + base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow", + base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow" + ] + + visited_paths = [] + + def file_visitor(written_file): + visited_paths.append(written_file.path) + partitioning = ds.partitioning( pa.schema([("part", pa.string())]), flavor="hive") ds.write_dataset(table, base_dir, format="feather", basename_template='dat_{i}.arrow', - partitioning=partitioning) + partitioning=partitioning, file_visitor=file_visitor) file_paths = list(base_dir.rglob("*")) - expected_paths = [ - base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow", - base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow" - ] assert set(file_paths) == set(expected_paths) result = ds.dataset(base_dir, format="ipc", partitioning=partitioning) assert result.to_table().equals(table) + assert len(visited_paths) == 2 + for visited_path in visited_paths: + assert pathlib.Path(visited_path) in expected_paths def test_write_table_multiple_fragments(tempdir): @@ -3349,6 +3410,69 @@ def test_write_dataset_csv(tempdir): assert result.equals(table) +@pytest.mark.parquet +def test_write_dataset_parquet_file_visitor(tempdir): + table = pa.table([ + pa.array(range(20)), pa.array(np.random.randn(20)), + pa.array(np.repeat(['a', 'b'], 10)) + ], names=["f1", "f2", "part"]) + + visitor_called = False + + def file_visitor(written_file): + nonlocal visitor_called + if (written_file.metadata is not None and + written_file.metadata.num_columns == 3): + visitor_called = True + + base_dir = tempdir / 'parquet_dataset' + ds.write_dataset(table, base_dir, format="parquet", + file_visitor=file_visitor) + + assert visitor_called + + +def test_partition_dataset_parquet_file_visitor(tempdir): + f1_vals = [item for chunk in range(4) for item in [chunk] * 10] + f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10] + table = pa.table({'f1': f1_vals, 'f2': f2_vals, + 'part': np.repeat(['a', 'b'], 20)}) + + root_path = tempdir / 'partitioned' + partitioning = ds.partitioning( + pa.schema([("part", pa.string())]), flavor="hive") + + paths_written = [] + + sample_metadata = None + + def file_visitor(written_file): + nonlocal sample_metadata + if written_file.metadata: + sample_metadata = written_file.metadata + paths_written.append(written_file.path) + + ds.write_dataset( + table, root_path, format="parquet", partitioning=partitioning, + use_threads=True, file_visitor=file_visitor + ) + + # Since it is a multi-threaded write there is no way to know which + # directory gets part-0 and which gets part-1 + expected_paths_a = { + root_path / 'part=a' / 'part-0.parquet', + root_path / 'part=b' / 'part-1.parquet' + } + expected_paths_b = { + root_path / 'part=a' / 'part-1.parquet', + root_path / 'part=b' / 'part-0.parquet' + } + paths_written_set = set(map(pathlib.Path, paths_written)) + assert paths_written_set in [expected_paths_a, expected_paths_b] + assert sample_metadata is not None + assert sample_metadata.num_columns == 2 + + @pytest.mark.parquet @pytest.mark.pandas def test_write_dataset_arrow_schema_metadata(tempdir):