Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 14 additions & 57 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,8 @@ struct WriteState {
};

Status WriteNextBatch(WriteState* state, const std::shared_ptr<Fragment>& fragment,
std::shared_ptr<RecordBatch> batch) {
const std::shared_ptr<RecordBatch>& batch) {
ARROW_ASSIGN_OR_RAISE(auto groups, state->write_options.partitioning->Partition(batch));
batch.reset(); // drop to hopefully conserve memory

if (groups.batches.size() > static_cast<size_t>(state->write_options.max_partitions)) {
return Status::Invalid("Fragment would be written into ", groups.batches.size(),
Expand Down Expand Up @@ -479,68 +478,26 @@ Status WriteNextBatch(WriteState* state, const std::shared_ptr<Fragment>& fragme
return Status::OK();
}

Status WriteInternal(const ScanOptions& scan_options, WriteState* state,
ScanTaskVector scan_tasks) {
// Store a mapping from partitions (represened by their formatted partition expressions)
// to a WriteQueue which flushes batches into that partition's output file. In principle
// any thread could produce a batch for any partition, so each task alternates between
// pushing batches and flushing them to disk.
auto task_group = scan_options.TaskGroup();

for (const auto& scan_task : scan_tasks) {
task_group->Append([&, scan_task] {
std::function<Status(std::shared_ptr<RecordBatch>)> visitor =
[&](std::shared_ptr<RecordBatch> batch) {
return WriteNextBatch(state, scan_task->fragment(), std::move(batch));
};
return internal::RunSynchronously<Future<>>(
[&](internal::Executor* executor) {
return scan_task->SafeVisit(executor, visitor);
},
/*use_threads=*/false);
});
}
return task_group->Finish();
}

} // namespace

Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
std::shared_ptr<Scanner> scanner) {
RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));

// Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent:
//
// - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing
// any fragments we have before waiting for discovery to complete. This isn't
// currently implemented for FileSystemDataset anyway: ARROW-8613
//
// - ScanTask iteration. Keeping this lazy would save some unnecessary blocking when
// writing Fragments which produce scan tasks slowly. No Fragments do this.
//
// NB: neither of these will have any impact whatsoever on the common case of writing
// an in-memory table to disk.

#if defined(__GNUC__) || defined(__clang__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#elif defined(_MSC_VER)
#pragma warning(push)
#pragma warning(disable : 4996)
#endif

// TODO(ARROW-11782/ARROW-12288) Remove calls to Scan()
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, scanner->Scan());
ARROW_ASSIGN_OR_RAISE(ScanTaskVector scan_tasks, scan_task_it.ToVector());

#if defined(__GNUC__) || defined(__clang__)
#pragma GCC diagnostic pop
#elif defined(_MSC_VER)
#pragma warning(pop)
#endif

WriteState state(write_options);
RETURN_NOT_OK(WriteInternal(*scanner->options(), &state, std::move(scan_tasks)));

RETURN_NOT_OK(internal::RunSynchronously<Future<>>(
[&](internal::Executor* executor) -> Future<> {
ARROW_ASSIGN_OR_RAISE(auto batch_gen, scanner->ScanBatchesUnorderedAsync());

batch_gen = MakeTransferredGenerator(std::move(batch_gen), executor);

return VisitAsyncGenerator(
std::move(batch_gen), [&](const EnumeratedRecordBatch& e) {
return WriteNextBatch(&state, e.fragment.value, e.record_batch.value);
});
},
/*use_threads=*/false));

auto task_group = scanner->options()->TaskGroup();
for (const auto& part_queue : state.queues) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ struct ARROW_DS_EXPORT ScanOptions {
/// If true then an asycnhronous implementation of the scanner will be used.
/// This implementation is newer and generally performs better. However, it
/// makes extensive use of threading and is still considered experimental
bool use_async = false;
bool use_async = true;

/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;
Expand Down
12 changes: 6 additions & 6 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ cdef class Dataset(_Weakrefable):
use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by
the number of available CPU cores.
use_async : bool, default False
use_async : bool, default True
If enabled, an async scanner will be used that should offer
better performance with high-latency/highly-parallel filesystems
(e.g. S3)
Expand Down Expand Up @@ -2789,7 +2789,7 @@ _DEFAULT_BATCH_SIZE = 2**20
cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr,
object columns=None, Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
bint use_threads=True, bint use_async=False,
bint use_threads=True, bint use_async=True,
MemoryPool memory_pool=None,
FragmentScanOptions fragment_scan_options=None)\
except *:
Expand Down Expand Up @@ -2866,7 +2866,7 @@ cdef class Scanner(_Weakrefable):
use_threads : bool, default True
If enabled, then maximum parallelism will be used determined by
the number of available CPU cores.
use_async : bool, default False
use_async : bool, default True
If enabled, an async scanner will be used that should offer
better performance with high-latency/highly-parallel filesystems
(e.g. S3)
Expand Down Expand Up @@ -2897,7 +2897,7 @@ cdef class Scanner(_Weakrefable):

@staticmethod
def from_dataset(Dataset dataset not None,
bint use_threads=True, bint use_async=False,
bint use_threads=True, bint use_async=True,
MemoryPool memory_pool=None,
object columns=None, Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
Expand All @@ -2918,7 +2918,7 @@ cdef class Scanner(_Weakrefable):

@staticmethod
def from_fragment(Fragment fragment not None, Schema schema=None,
bint use_threads=True, bint use_async=False,
bint use_threads=True, bint use_async=True,
MemoryPool memory_pool=None,
object columns=None, Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
Expand All @@ -2942,7 +2942,7 @@ cdef class Scanner(_Weakrefable):

@staticmethod
def from_batches(source, Schema schema=None, bint use_threads=True,
bint use_async=False,
bint use_async=True,
MemoryPool memory_pool=None, object columns=None,
Expression filter=None,
int batch_size=_DEFAULT_BATCH_SIZE,
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ def _ensure_write_partitioning(scheme):
def write_dataset(data, base_dir, basename_template=None, format=None,
partitioning=None, schema=None,
filesystem=None, file_options=None, use_threads=True,
use_async=False, max_partitions=None, file_visitor=None):
use_async=True, max_partitions=None, file_visitor=None):
"""
Write a dataset to a given format and partitioning.

Expand Down Expand Up @@ -725,7 +725,7 @@ def write_dataset(data, base_dir, basename_template=None, format=None,
use_threads : bool, default True
Write files in parallel. If enabled, then maximum parallelism will be
used determined by the number of available CPU cores.
use_async : bool, default False
use_async : bool, default True
If enabled, an async scanner will be used that should offer
better performance with high-latency/highly-parallel filesystems
(e.g. S3)
Expand Down
29 changes: 25 additions & 4 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3129,6 +3129,28 @@ def test_legacy_write_to_dataset_drops_null(tempdir):
assert actual == expected


def _check_files_unordered(actual, expected):
# specific basenames may be written to the expected partitions
# in differing order

# assert all expected basenames were written somewhere
assert set(a.name for a in actual) == set(e.name for e in expected)

actual_parent_to_count = {}
expected_parent_to_count = {}

for paths, parent_to_count in zip([actual, actual_parent_to_count],
[expected, expected_parent_to_count]):
for p in paths:
if p.parent in parent_to_count:
parent_to_count[p.parent] += 1
else:
parent_to_count[p.parent] = 1

# assert that each directory received the expected number of files
assert actual_parent_to_count == expected_parent_to_count


def _check_dataset_roundtrip(dataset, base_dir, expected_files,
base_dir_path=None, partitioning=None):
base_dir_path = base_dir_path or base_dir
Expand All @@ -3137,8 +3159,7 @@ def _check_dataset_roundtrip(dataset, base_dir, expected_files,
partitioning=partitioning, use_threads=False)

# check that all files are present
file_paths = list(base_dir_path.rglob("*"))
assert set(file_paths) == set(expected_files)
_check_files_unordered(base_dir_path.rglob("*"), expected_files)

# check that reading back in as dataset gives the same result
dataset2 = ds.dataset(
Expand Down Expand Up @@ -3389,10 +3410,10 @@ def test_write_iterable(tempdir):


def test_write_scanner(tempdir, dataset_reader):
if dataset_reader.use_async:
if not dataset_reader.use_async:
pytest.skip(
('ARROW-12803: Write dataset with scanner does not'
' support async scan'))
' support sync scan'))

table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
Expand Down
4 changes: 2 additions & 2 deletions r/R/dataset-scan.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#' to keep all rows.
#' * `use_threads`: logical: should scanning use multithreading? Default `TRUE`
#' * `use_async`: logical: should the async scanner (performs better on
#' high-latency/highly parallel filesystems like S3) be used? Default `FALSE`
#' high-latency/highly parallel filesystems like S3) be used? Default `TRUE`
#' * `...`: Additional arguments, currently ignored
#' @section Methods:
#' `ScannerBuilder` has the following methods:
Expand Down Expand Up @@ -76,7 +76,7 @@ Scanner$create <- function(dataset,
fragment_scan_options = NULL,
...) {
if (is.null(use_async)) {
use_async = getOption("arrow.use_async", FALSE)
use_async = getOption("arrow.use_async", TRUE)
}

if (inherits(dataset, "arrow_dplyr_query")) {
Expand Down