diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 0ef1d4577cd..badd18bf318 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -826,15 +826,6 @@ Status ScannerBuilder::UseThreads(bool use_threads) { return Status::OK(); } -Status ScannerBuilder::FragmentReadahead(int fragment_readahead) { - if (fragment_readahead <= 0) { - return Status::Invalid("FragmentReadahead must be greater than 0, got ", - fragment_readahead); - } - scan_options_->fragment_readahead = fragment_readahead; - return Status::OK(); -} - Status ScannerBuilder::BatchSize(int64_t batch_size) { if (batch_size <= 0) { return Status::Invalid("BatchSize must be greater than 0, got ", batch_size); @@ -843,6 +834,24 @@ Status ScannerBuilder::BatchSize(int64_t batch_size) { return Status::OK(); } +Status ScannerBuilder::BatchReadahead(int32_t batch_readahead) { + if (batch_readahead < 0) { + return Status::Invalid("BatchReadahead must be greater than or equal 0, got ", + batch_readahead); + } + scan_options_->batch_readahead = batch_readahead; + return Status::OK(); +} + +Status ScannerBuilder::FragmentReadahead(int32_t fragment_readahead) { + if (fragment_readahead < 0) { + return Status::Invalid("FragmentReadahead must be greater than or equal 0, got ", + fragment_readahead); + } + scan_options_->fragment_readahead = fragment_readahead; + return Status::OK(); +} + Status ScannerBuilder::Pool(MemoryPool* pool) { scan_options_->pool = pool; return Status::OK(); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 7098bad8f45..646cc0de72e 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -82,7 +82,7 @@ struct ARROW_DS_EXPORT ScanOptions { /// Maximum row count for scanned batches. int64_t batch_size = kDefaultBatchSize; - /// How many batches to read ahead within a file + /// How many batches to read ahead within a fragment. /// /// Set to 0 to disable batch readahead /// @@ -373,9 +373,6 @@ class ARROW_DS_EXPORT ScannerBuilder { /// ThreadPool found in ScanOptions; Status UseThreads(bool use_threads = true); - /// \brief Limit how many fragments the scanner will read at once - Status FragmentReadahead(int fragment_readahead); - /// \brief Set the maximum number of rows per RecordBatch. /// /// \param[in] batch_size the maximum number of rows. @@ -384,6 +381,24 @@ class ARROW_DS_EXPORT ScannerBuilder { /// This option provides a control limiting the memory owned by any RecordBatch. Status BatchSize(int64_t batch_size); + /// \brief Set the number of batches to read ahead within a fragment. + /// + /// \param[in] batch_readahead How many batches to read ahead within a fragment + /// \returns an error if this number is less than 0. + /// + /// This option provides a control on the RAM vs I/O tradeoff. + /// It might not be supported by all file formats, in which case it will + /// simply be ignored. + Status BatchReadahead(int32_t batch_readahead); + + /// \brief Set the number of fragments to read ahead + /// + /// \param[in] fragment_readahead How many fragments to read ahead + /// \returns an error if this number is less than 0. + /// + /// This option provides a control on the RAM vs I/O tradeoff. + Status FragmentReadahead(int32_t fragment_readahead); + /// \brief Set the pool from which materialized and scanned arrays will be allocated. Status Pool(MemoryPool* pool); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 68833a5350e..b9e333c3d28 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2168,11 +2168,14 @@ cdef class TaggedRecordBatchIterator(_Weakrefable): _DEFAULT_BATCH_SIZE = 2**17 - +_DEFAULT_BATCH_READAHEAD = 16 +_DEFAULT_FRAGMENT_READAHEAD = 4 cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, + int batch_readahead=_DEFAULT_BATCH_READAHEAD, + int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, bint use_threads=True, MemoryPool memory_pool=None, FragmentScanOptions fragment_scan_options=None)\ except *: @@ -2207,6 +2210,8 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, ) check_status(builder.BatchSize(batch_size)) + check_status(builder.BatchReadahead(batch_readahead)) + check_status(builder.FragmentReadahead(fragment_readahead)) check_status(builder.UseThreads(use_threads)) if memory_pool: check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool))) @@ -2254,6 +2259,12 @@ cdef class Scanner(_Weakrefable): The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size. + batch_readahead : int, default 16 + The number of batches to read ahead in a file. Increasing this number + will increase RAM usage but could also improve IO utilization. + fragment_readahead : int, default 4 + The number of files to read ahead. Increasing this number will increase + RAM usage but could also improve IO utilization. use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. @@ -2291,6 +2302,8 @@ cdef class Scanner(_Weakrefable): MemoryPool memory_pool=None, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, + int batch_readahead=_DEFAULT_BATCH_READAHEAD, + int fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, FragmentScanOptions fragment_scan_options=None): """ Create Scanner from Dataset, @@ -2328,6 +2341,13 @@ cdef class Scanner(_Weakrefable): The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size. + batch_readahead : int, default 16 + The number of batches to read ahead in a file. This might not work + for all file formats. Increasing this number will increase + RAM usage but could also improve IO utilization. + fragment_readahead : int, default 4 + The number of files to read ahead. Increasing this number will increase + RAM usage but could also improve IO utilization. use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. @@ -2354,7 +2374,8 @@ cdef class Scanner(_Weakrefable): builder = make_shared[CScannerBuilder](dataset.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, - batch_size=batch_size, use_threads=use_threads, + batch_size=batch_size, batch_readahead=batch_readahead, + fragment_readahead=fragment_readahead, use_threads=use_threads, memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) @@ -2367,6 +2388,7 @@ cdef class Scanner(_Weakrefable): MemoryPool memory_pool=None, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, + int batch_readahead=_DEFAULT_BATCH_READAHEAD, FragmentScanOptions fragment_scan_options=None): """ Create Scanner from Fragment, @@ -2406,6 +2428,10 @@ cdef class Scanner(_Weakrefable): The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size. + batch_readahead : int, default 16 + The number of batches to read ahead in a file. This might not work + for all file formats. Increasing this number will increase + RAM usage but could also improve IO utilization. use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. @@ -2435,7 +2461,9 @@ cdef class Scanner(_Weakrefable): builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema), fragment.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, - batch_size=batch_size, use_threads=use_threads, + batch_size=batch_size, batch_readahead=batch_readahead, + fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, + use_threads=use_threads, memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) @@ -2508,7 +2536,8 @@ cdef class Scanner(_Weakrefable): FutureWarning) _populate_builder(builder, columns=columns, filter=filter, - batch_size=batch_size, use_threads=use_threads, + batch_size=batch_size, batch_readahead=_DEFAULT_BATCH_READAHEAD, + fragment_readahead=_DEFAULT_FRAGMENT_READAHEAD, use_threads=use_threads, memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index bd8fbd1b56a..d418830bc2f 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -122,6 +122,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CStatus UseThreads(c_bool use_threads) CStatus Pool(CMemoryPool* pool) CStatus BatchSize(int64_t batch_size) + CStatus BatchReadahead(int32_t batch_readahead) + CStatus FragmentReadahead(int32_t fragment_readahead) CStatus FragmentScanOptions( shared_ptr[CFragmentScanOptions] fragment_scan_options) CResult[shared_ptr[CScanner]] Finish() diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b900e694a91..851b25a2642 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -423,6 +423,13 @@ def test_dataset(dataset, dataset_reader): False, False, True, True] +@pytest.mark.parquet +def test_scanner_options(dataset): + scanner = dataset.to_batches(fragment_readahead=16, batch_readahead=8) + batch = next(scanner) + assert batch.num_columns == 7 + + @pytest.mark.parquet def test_scanner(dataset, dataset_reader): scanner = dataset_reader.scanner(