Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,8 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
for (const auto& part_queue : state.queues) {
task_group->Append([&] {
RETURN_NOT_OK(write_options.writer_pre_finish(part_queue.second->writer().get()));
return part_queue.second->writer()->Finish();
RETURN_NOT_OK(part_queue.second->writer()->Finish());
return write_options.writer_post_finish(part_queue.second->writer().get());
});
}
return task_group->Finish();
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,12 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
return Status::OK();
};

/// Callback to be invoked against all FileWriters after they have
/// called FileWriter::Finish().
std::function<Status(FileWriter*)> writer_post_finish = [](FileWriter*) {
return Status::OK();
};

const std::shared_ptr<FileFormat>& format() const {
return file_write_options->format();
}
Expand Down
58 changes: 57 additions & 1 deletion python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -3038,7 +3038,7 @@ def _get_partition_keys(Expression partition_expression):

For example, an expression of
<pyarrow.dataset.Expression ((part == A:string) and (year == 2016:int32))>
is converted to {'part': 'a', 'year': 2016}
is converted to {'part': 'A', 'year': 2016}
"""
cdef:
CExpression expr = partition_expression.unwrap()
Expand All @@ -3053,6 +3053,53 @@ def _get_partition_keys(Expression partition_expression):
return out


ctypedef CParquetFileWriter* _CParquetFileWriterPtr

cdef class WrittenFile(_Weakrefable):
"""
Metadata information about files written as
part of a dataset write operation
"""

"""The full path to the created file"""
cdef public str path
"""
If the file is a parquet file this will contain the parquet metadata.
This metadata will have the file path attribute set to the path of
the written file.
"""
cdef public object metadata

def __init__(self, path, metadata):
self.path = path
self.metadata = metadata

cdef void _filesystemdataset_write_visitor(
dict visit_args,
CFileWriter* file_writer):
cdef:
str path
str base_dir
WrittenFile written_file
FileMetaData parquet_metadata
CParquetFileWriter* parquet_file_writer

parquet_metadata = None
path = frombytes(deref(file_writer).destination().path)
if deref(deref(file_writer).format()).type_name() == b"parquet":
parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer)
with nogil:
metadata = deref(
deref(parquet_file_writer).parquet_writer()).metadata()
if metadata:
base_dir = frombytes(visit_args['base_dir'])
parquet_metadata = FileMetaData()
parquet_metadata.init(metadata)
parquet_metadata.set_file_path(os.path.relpath(path, base_dir))
written_file = WrittenFile(path, parquet_metadata)
visit_args['file_visitor'](written_file)


def _filesystemdataset_write(
Scanner data not None,
object base_dir not None,
Expand All @@ -3061,6 +3108,7 @@ def _filesystemdataset_write(
Partitioning partitioning not None,
FileWriteOptions file_options not None,
int max_partitions,
object file_visitor
):
"""
CFileSystemDataset.Write wrapper
Expand All @@ -3069,13 +3117,21 @@ def _filesystemdataset_write(
CFileSystemDatasetWriteOptions c_options
shared_ptr[CScanner] c_scanner
vector[shared_ptr[CRecordBatch]] c_batches
dict visit_args

c_options.file_write_options = file_options.unwrap()
c_options.filesystem = filesystem.unwrap()
c_options.base_dir = tobytes(_stringify_path(base_dir))
c_options.partitioning = partitioning.unwrap()
c_options.max_partitions = max_partitions
c_options.basename_template = tobytes(basename_template)
if file_visitor is not None:
visit_args = {'base_dir': c_options.base_dir,
'file_visitor': file_visitor}
# Need to use post_finish because parquet metadata is not available
# until after Finish has been called
c_options.writer_post_finish = BindFunction[cb_writer_finish_internal](
&_filesystemdataset_write_visitor, visit_args)

c_scanner = data.unwrap()
with nogil:
Expand Down
23 changes: 21 additions & 2 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ def _ensure_write_partitioning(scheme):
def write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
use_async=False, max_partitions=None):
use_async=False, max_partitions=None, file_visitor=None):
"""
Write a dataset to a given format and partitioning.

Expand Down Expand Up @@ -731,6 +731,25 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
(e.g. S3)
max_partitions : int, default 1024
Maximum number of partitions any batch may be written into.
file_visitor : Function
If set, this function will be called with a WrittenFile instance
for each file created during the call. This object will have both
a path attribute and a metadata attribute.

The path attribute will be a string containing the path to
the created file.

The metadata attribute will be the parquet metadata of the file.
This metadata will have the file path attribute set and can be used
to build a _metadata file. The metadata attribute will be None if
the format is not parquet.

Example visitor which simple collects the filenames created::

visited_paths = []

def file_visitor(written_file):
visited_paths.append(written_file.path)
"""
from pyarrow.fs import _resolve_filesystem_and_path

Expand Down Expand Up @@ -784,5 +803,5 @@ def write_dataset(data, base_dir, basename_template=None, format=None,

_filesystemdataset_write(
scanner, base_dir, basename_template, filesystem, partitioning,
file_options, max_partitions
file_options, max_partitions, file_visitor
)
15 changes: 15 additions & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ cdef extern from "arrow/compute/exec/expression.h" \
CExtractKnownFieldValues "arrow::compute::ExtractKnownFieldValues"(
const CExpression& partition_expression)

ctypedef CStatus cb_writer_finish_internal(CFileWriter*)
ctypedef void cb_writer_finish(dict, CFileWriter*)

cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:

Expand Down Expand Up @@ -223,6 +225,17 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
const shared_ptr[CFileFormat]& format() const
c_string type_name() const

cdef cppclass CFileWriter \
"arrow::dataset::FileWriter":
const shared_ptr[CFileFormat]& format() const
const shared_ptr[CSchema]& schema() const
const shared_ptr[CFileWriteOptions]& options() const
const CFileLocator& destination() const

cdef cppclass CParquetFileWriter \
"arrow::dataset::ParquetFileWriter"(CFileWriter):
const shared_ptr[FileWriter]& parquet_writer() const

cdef cppclass CFileFormat "arrow::dataset::FileFormat":
shared_ptr[CFragmentScanOptions] default_fragment_scan_options
c_string type_name() const
Expand Down Expand Up @@ -263,6 +276,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CPartitioning] partitioning
int max_partitions
c_string basename_template
function[cb_writer_finish_internal] writer_pre_finish
function[cb_writer_finish_internal] writer_post_finish

cdef cppclass CFileSystemDataset \
"arrow::dataset::FileSystemDataset"(CDataset):
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/includes/libarrow_fs.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ cdef extern from "arrow/filesystem/api.h" namespace "arrow::fs" nogil:
c_bool allow_not_found
c_bool recursive

cdef cppclass CFileLocator "arrow::fs::FileLocator":
shared_ptr[CFileSystem] filesystem
c_string path

cdef cppclass CFileSystem "arrow::fs::FileSystem":
shared_ptr[CFileSystem] shared_from_this()
c_string type_name() const
Expand Down
7 changes: 5 additions & 2 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1958,8 +1958,10 @@ def write_to_dataset(table, root_path, partition_cols=None,
"implementation."
)
metadata_collector = kwargs.pop('metadata_collector', None)
file_visitor = None
if metadata_collector is not None:
raise ValueError(msg.format("metadata_collector"))
def file_visitor(written_file):
metadata_collector.append(written_file.metadata)
if partition_filename_cb is not None:
raise ValueError(msg.format("partition_filename_cb"))

Expand All @@ -1979,7 +1981,8 @@ def write_to_dataset(table, root_path, partition_cols=None,
ds.write_dataset(
table, root_path, filesystem=filesystem,
format=parquet_format, file_options=write_options, schema=schema,
partitioning=partitioning, use_threads=use_threads)
partitioning=partitioning, use_threads=use_threads,
file_visitor=file_visitor)
return

fs, root_path = legacyfs.resolve_filesystem_and_path(root_path, filesystem)
Expand Down
Loading