From 57c0f457ee0974161a7280f57d6178761307bf8c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 19 Jul 2021 16:07:14 -0400 Subject: [PATCH 1/3] ARROW-13338: [C++][Dataset][R][Python] Default to async scanning --- cpp/src/arrow/dataset/scanner.h | 2 +- python/pyarrow/_dataset.pyx | 12 ++++++------ python/pyarrow/dataset.py | 4 ++-- r/R/dataset-scan.R | 4 ++-- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index fc715206d7d..317887884b4 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -115,7 +115,7 @@ struct ARROW_DS_EXPORT ScanOptions { /// If true then an asycnhronous implementation of the scanner will be used. /// This implementation is newer and generally performs better. However, it /// makes extensive use of threading and is still considered experimental - bool use_async = false; + bool use_async = true; /// Fragment-specific scan options. std::shared_ptr fragment_scan_options; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 07684eff3b4..c8f388404f6 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -402,7 +402,7 @@ cdef class Dataset(_Weakrefable): use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False + use_async : bool, default True If enabled, an async scanner will be used that should offer better performance with high-latency/highly-parallel filesystems (e.g. S3) @@ -2789,7 +2789,7 @@ _DEFAULT_BATCH_SIZE = 2**20 cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, - bint use_threads=True, bint use_async=False, + bint use_threads=True, bint use_async=True, MemoryPool memory_pool=None, FragmentScanOptions fragment_scan_options=None)\ except *: @@ -2866,7 +2866,7 @@ cdef class Scanner(_Weakrefable): use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False + use_async : bool, default True If enabled, an async scanner will be used that should offer better performance with high-latency/highly-parallel filesystems (e.g. S3) @@ -2897,7 +2897,7 @@ cdef class Scanner(_Weakrefable): @staticmethod def from_dataset(Dataset dataset not None, - bint use_threads=True, bint use_async=False, + bint use_threads=True, bint use_async=True, MemoryPool memory_pool=None, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, @@ -2918,7 +2918,7 @@ cdef class Scanner(_Weakrefable): @staticmethod def from_fragment(Fragment fragment not None, Schema schema=None, - bint use_threads=True, bint use_async=False, + bint use_threads=True, bint use_async=True, MemoryPool memory_pool=None, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, @@ -2942,7 +2942,7 @@ cdef class Scanner(_Weakrefable): @staticmethod def from_batches(source, Schema schema=None, bint use_threads=True, - bint use_async=False, + bint use_async=True, MemoryPool memory_pool=None, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 8b5799e6da2..4459abbfcde 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -690,7 +690,7 @@ def _ensure_write_partitioning(scheme): def write_dataset(data, base_dir, basename_template=None, format=None, partitioning=None, schema=None, filesystem=None, file_options=None, use_threads=True, - use_async=False, max_partitions=None, file_visitor=None): + use_async=True, max_partitions=None, file_visitor=None): """ Write a dataset to a given format and partitioning. @@ -725,7 +725,7 @@ def write_dataset(data, base_dir, basename_template=None, format=None, use_threads : bool, default True Write files in parallel. If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False + use_async : bool, default True If enabled, an async scanner will be used that should offer better performance with high-latency/highly-parallel filesystems (e.g. S3) diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 4fc73485e3a..ac2ddbb2767 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -33,7 +33,7 @@ #' to keep all rows. #' * `use_threads`: logical: should scanning use multithreading? Default `TRUE` #' * `use_async`: logical: should the async scanner (performs better on -#' high-latency/highly parallel filesystems like S3) be used? Default `FALSE` +#' high-latency/highly parallel filesystems like S3) be used? Default `TRUE` #' * `...`: Additional arguments, currently ignored #' @section Methods: #' `ScannerBuilder` has the following methods: @@ -76,7 +76,7 @@ Scanner$create <- function(dataset, fragment_scan_options = NULL, ...) { if (is.null(use_async)) { - use_async = getOption("arrow.use_async", FALSE) + use_async = getOption("arrow.use_async", TRUE) } if (inherits(dataset, "arrow_dplyr_query")) { From c69f0d276a4fd428f07f5ff3d85f834adf3ca685 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 20 Jul 2021 10:43:45 -0400 Subject: [PATCH 2/3] remove references to Scanner::Scan --- cpp/src/arrow/dataset/file_base.cc | 71 ++++++------------------------ 1 file changed, 14 insertions(+), 57 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 68c309bea8f..36704636f36 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -431,9 +431,8 @@ struct WriteState { }; Status WriteNextBatch(WriteState* state, const std::shared_ptr& fragment, - std::shared_ptr batch) { + const std::shared_ptr& batch) { ARROW_ASSIGN_OR_RAISE(auto groups, state->write_options.partitioning->Partition(batch)); - batch.reset(); // drop to hopefully conserve memory if (groups.batches.size() > static_cast(state->write_options.max_partitions)) { return Status::Invalid("Fragment would be written into ", groups.batches.size(), @@ -479,68 +478,26 @@ Status WriteNextBatch(WriteState* state, const std::shared_ptr& fragme return Status::OK(); } -Status WriteInternal(const ScanOptions& scan_options, WriteState* state, - ScanTaskVector scan_tasks) { - // Store a mapping from partitions (represened by their formatted partition expressions) - // to a WriteQueue which flushes batches into that partition's output file. In principle - // any thread could produce a batch for any partition, so each task alternates between - // pushing batches and flushing them to disk. - auto task_group = scan_options.TaskGroup(); - - for (const auto& scan_task : scan_tasks) { - task_group->Append([&, scan_task] { - std::function)> visitor = - [&](std::shared_ptr batch) { - return WriteNextBatch(state, scan_task->fragment(), std::move(batch)); - }; - return internal::RunSynchronously>( - [&](internal::Executor* executor) { - return scan_task->SafeVisit(executor, visitor); - }, - /*use_threads=*/false); - }); - } - return task_group->Finish(); -} - } // namespace Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, std::shared_ptr scanner) { RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template)); - // Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent: - // - // - Fragment iteration. Keeping this lazy would allow us to start partitioning/writing - // any fragments we have before waiting for discovery to complete. This isn't - // currently implemented for FileSystemDataset anyway: ARROW-8613 - // - // - ScanTask iteration. Keeping this lazy would save some unnecessary blocking when - // writing Fragments which produce scan tasks slowly. No Fragments do this. - // - // NB: neither of these will have any impact whatsoever on the common case of writing - // an in-memory table to disk. - -#if defined(__GNUC__) || defined(__clang__) -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wdeprecated-declarations" -#elif defined(_MSC_VER) -#pragma warning(push) -#pragma warning(disable : 4996) -#endif - - // TODO(ARROW-11782/ARROW-12288) Remove calls to Scan() - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, scanner->Scan()); - ARROW_ASSIGN_OR_RAISE(ScanTaskVector scan_tasks, scan_task_it.ToVector()); - -#if defined(__GNUC__) || defined(__clang__) -#pragma GCC diagnostic pop -#elif defined(_MSC_VER) -#pragma warning(pop) -#endif - WriteState state(write_options); - RETURN_NOT_OK(WriteInternal(*scanner->options(), &state, std::move(scan_tasks))); + + RETURN_NOT_OK(internal::RunSynchronously>( + [&](internal::Executor* executor) -> Future<> { + ARROW_ASSIGN_OR_RAISE(auto batch_gen, scanner->ScanBatchesUnorderedAsync()); + + batch_gen = MakeTransferredGenerator(std::move(batch_gen), executor); + + return VisitAsyncGenerator( + std::move(batch_gen), [&](const EnumeratedRecordBatch& e) { + return WriteNextBatch(&state, e.fragment.value, e.record_batch.value); + }); + }, + /*use_threads=*/false)); auto task_group = scanner->options()->TaskGroup(); for (const auto& part_queue : state.queues) { From 8828e6b750a081079722bca7c734f7e3b63b1ae4 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 20 Jul 2021 12:55:49 -0400 Subject: [PATCH 3/3] loosen written file comparison --- python/pyarrow/tests/test_dataset.py | 29 ++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6f9662471fc..156b73e09c1 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3129,6 +3129,28 @@ def test_legacy_write_to_dataset_drops_null(tempdir): assert actual == expected +def _check_files_unordered(actual, expected): + # specific basenames may be written to the expected partitions + # in differing order + + # assert all expected basenames were written somewhere + assert set(a.name for a in actual) == set(e.name for e in expected) + + actual_parent_to_count = {} + expected_parent_to_count = {} + + for paths, parent_to_count in zip([actual, actual_parent_to_count], + [expected, expected_parent_to_count]): + for p in paths: + if p.parent in parent_to_count: + parent_to_count[p.parent] += 1 + else: + parent_to_count[p.parent] = 1 + + # assert that each directory received the expected number of files + assert actual_parent_to_count == expected_parent_to_count + + def _check_dataset_roundtrip(dataset, base_dir, expected_files, base_dir_path=None, partitioning=None): base_dir_path = base_dir_path or base_dir @@ -3137,8 +3159,7 @@ def _check_dataset_roundtrip(dataset, base_dir, expected_files, partitioning=partitioning, use_threads=False) # check that all files are present - file_paths = list(base_dir_path.rglob("*")) - assert set(file_paths) == set(expected_files) + _check_files_unordered(base_dir_path.rglob("*"), expected_files) # check that reading back in as dataset gives the same result dataset2 = ds.dataset( @@ -3389,10 +3410,10 @@ def test_write_iterable(tempdir): def test_write_scanner(tempdir, dataset_reader): - if dataset_reader.use_async: + if not dataset_reader.use_async: pytest.skip( ('ARROW-12803: Write dataset with scanner does not' - ' support async scan')) + ' support sync scan')) table = pa.table([ pa.array(range(20)), pa.array(np.random.randn(20)),