From 3f9508a368fb80f37d6739bc569bed840bb9494e Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 12 Mar 2021 10:46:13 -0500 Subject: [PATCH 1/4] ARROW-9749: [C++][Dataset] Add ConvertOptions for CSV datasets --- cpp/src/arrow/dataset/dataset.h | 13 +++++++++++++ cpp/src/arrow/dataset/file_csv.cc | 19 ++++++++++++++----- cpp/src/arrow/dataset/file_csv.h | 12 +++++++++++- cpp/src/arrow/dataset/file_csv_test.cc | 22 ++++++++++++++++++++++ cpp/src/arrow/dataset/scanner.h | 3 +++ 5 files changed, 63 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index afdbe328d5f..30b7449f31a 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -89,6 +89,19 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { std::shared_ptr physical_schema_; }; +/// \brief Per-scan options for fragment(s) in a dataset. +/// +/// These options are not intrinsic to the format or fragment itself, but do affect +/// the results of a scan. These are options which make sense to change between +/// repeated reads of the same dataset, such as format-specific conversion options +/// (that do not affect the schema). +class ARROW_DS_EXPORT FragmentScanOptions { + public: + virtual std::string type_name() const = 0; + virtual std::string ToString() const { return type_name(); } + virtual ~FragmentScanOptions() = default; +}; + /// \brief A trivial Fragment that yields ScanTask out of a fixed set of /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 0c023b87dcd..7cc4a8375f0 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -76,12 +76,19 @@ Result> GetColumnNames( static inline Result GetConvertOptions( const CsvFileFormat& format, const std::shared_ptr& scan_options, - const Buffer& first_block, MemoryPool* pool) { + const std::shared_ptr& scan_context, const Buffer& first_block, + MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE( auto column_names, GetColumnNames(format.parse_options, util::string_view{first_block}, pool)); auto convert_options = csv::ConvertOptions::Defaults(); + if (scan_context && scan_context->fragment_scan_options && + scan_context->fragment_scan_options->type_name() == kCsvTypeName) { + auto csv_scan_options = internal::checked_pointer_cast( + scan_context->fragment_scan_options); + convert_options = csv_scan_options->convert_options; + } for (FieldRef ref : scan_options->MaterializedFields()) { ARROW_ASSIGN_OR_RAISE(auto field, ref.GetOne(*scan_options->dataset_schema)); @@ -104,6 +111,7 @@ static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) { static inline Result> OpenReader( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options = nullptr, + const std::shared_ptr& scan_context = nullptr, MemoryPool* pool = default_memory_pool()) { ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); @@ -115,8 +123,9 @@ static inline Result> OpenReader( auto convert_options = csv::ConvertOptions::Defaults(); if (scan_options != nullptr) { - ARROW_ASSIGN_OR_RAISE(convert_options, - GetConvertOptions(format, scan_options, *first_block, pool)); + ARROW_ASSIGN_OR_RAISE( + convert_options, + GetConvertOptions(format, scan_options, scan_context, *first_block, pool)); } auto maybe_reader = @@ -141,8 +150,8 @@ class CsvScanTask : public ScanTask { source_(fragment->source()) {} Result Execute() override { - ARROW_ASSIGN_OR_RAISE(auto reader, - OpenReader(source_, *format_, options(), context()->pool)); + ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_, options(), context(), + context()->pool)); return IteratorFromReader(std::move(reader)); } diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 1e83189ce04..b7a9388a324 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -21,6 +21,7 @@ #include #include "arrow/csv/options.h" +#include "arrow/dataset/dataset.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" @@ -29,13 +30,15 @@ namespace arrow { namespace dataset { +constexpr char kCsvTypeName[] = "csv"; + /// \brief A FileFormat implementation that reads from and writes to Csv files class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { public: /// Options affecting the parsing of CSV files csv::ParseOptions parse_options = csv::ParseOptions::Defaults(); - std::string type_name() const override { return "csv"; } + std::string type_name() const override { return kCsvTypeName; } bool Equals(const FileFormat& other) const override; @@ -58,5 +61,12 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { std::shared_ptr DefaultWriteOptions() override { return NULLPTR; } }; +class ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { + public: + std::string type_name() const override { return kCsvTypeName; } + + csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 5c27f81b094..aa9a2b186d0 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -81,6 +81,28 @@ N/A ASSERT_EQ(row_count, 3); } +TEST_F(TestCsvFileFormat, CustomConvertOptions) { + auto source = GetFileSource(R"(str +foo +MYNULL +N/A +bar)"); + SetSchema({field("str", utf8())}); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); + auto fragment_scan_options = std::make_shared(); + fragment_scan_options->convert_options.null_values = {"MYNULL"}; + fragment_scan_options->convert_options.strings_can_be_null = true; + ctx_->fragment_scan_options = fragment_scan_options; + + int64_t null_count = 0; + for (auto maybe_batch : Batches(fragment.get())) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + null_count += batch->GetColumnByName("str")->null_count(); + } + + ASSERT_EQ(null_count, 1); +} + TEST_F(TestCsvFileFormat, ScanRecordBatchReaderWithVirtualColumn) { auto source = GetFileSource(R"(f64 1.0 diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index d335fe4d4bd..4af84a4a929 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -46,6 +46,9 @@ struct ARROW_DS_EXPORT ScanContext { /// Indicate if the Scanner should make use of a ThreadPool. bool use_threads = false; + /// Fragment-specific scan options. + std::shared_ptr fragment_scan_options; + /// Return a threaded or serial TaskGroup according to use_threads. std::shared_ptr TaskGroup() const; }; From 86224290b056d7fa321711c09dba7a9fe1e35b48 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 12 Mar 2021 11:12:38 -0500 Subject: [PATCH 2/4] ARROW-9749: [C++][Python][Dataset][R] Eliminate all fields from ScanContext --- cpp/src/arrow/dataset/dataset.cc | 5 +++ cpp/src/arrow/dataset/dataset.h | 1 + cpp/src/arrow/dataset/file_base.cc | 8 ++-- cpp/src/arrow/dataset/file_csv.cc | 19 ++++------ cpp/src/arrow/dataset/file_csv_test.cc | 2 +- cpp/src/arrow/dataset/file_ipc.cc | 2 +- cpp/src/arrow/dataset/file_parquet.cc | 4 +- cpp/src/arrow/dataset/scanner.cc | 39 ++++++++++++++------ cpp/src/arrow/dataset/scanner.h | 32 +++++++++------- cpp/src/arrow/dataset/scanner_internal.h | 4 +- cpp/src/arrow/dataset/scanner_test.cc | 4 +- python/pyarrow/_dataset.pyx | 26 ++++++------- python/pyarrow/includes/libarrow_dataset.pxd | 4 +- r/src/dataset.cpp | 6 +-- 14 files changed, 90 insertions(+), 66 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 051e446974d..436b891bd74 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -97,6 +97,11 @@ Dataset::Dataset(std::shared_ptr schema, Expression partition_expression : schema_(std::move(schema)), partition_expression_(std::move(partition_expression)) {} +Result> Dataset::NewScan( + std::shared_ptr options) { + return std::make_shared(this->shared_from_this(), options); +} + Result> Dataset::NewScan( std::shared_ptr context) { return std::make_shared(this->shared_from_this(), context); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 30b7449f31a..b5af67e6239 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -131,6 +131,7 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: /// \brief Begin to build a new Scan operation against this Dataset + Result> NewScan(std::shared_ptr options); Result> NewScan(std::shared_ptr context); Result> NewScan(); diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 3ee23549130..ba6cd901305 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -339,7 +339,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio std::shared_ptr scanner) { RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template)); - auto task_group = scanner->context()->TaskGroup(); + auto task_group = scanner->options()->TaskGroup(); // Things we'll un-lazy for the sake of simplicity, with the tradeoff they represent: // @@ -356,12 +356,12 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector()); ScanTaskVector scan_tasks; - // Avoid contention with multithreaded readers auto context = std::make_shared(*scanner->context()); - context->use_threads = false; for (const auto& fragment : fragments) { auto options = std::make_shared(*scanner->options()); + // Avoid contention with multithreaded readers + options->use_threads = false; ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scanner(fragment, std::move(options), context).Scan()); for (auto maybe_scan_task : scan_task_it) { @@ -434,7 +434,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio } RETURN_NOT_OK(task_group->Finish()); - task_group = scanner->context()->TaskGroup(); + task_group = scanner->options()->TaskGroup(); for (const auto& part_queue : queues) { task_group->Append([&] { return part_queue.second->writer()->Finish(); }); } diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 7cc4a8375f0..d8d9439eba5 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -76,17 +76,16 @@ Result> GetColumnNames( static inline Result GetConvertOptions( const CsvFileFormat& format, const std::shared_ptr& scan_options, - const std::shared_ptr& scan_context, const Buffer& first_block, - MemoryPool* pool) { + const Buffer& first_block, MemoryPool* pool) { ARROW_ASSIGN_OR_RAISE( auto column_names, GetColumnNames(format.parse_options, util::string_view{first_block}, pool)); auto convert_options = csv::ConvertOptions::Defaults(); - if (scan_context && scan_context->fragment_scan_options && - scan_context->fragment_scan_options->type_name() == kCsvTypeName) { + if (scan_options && scan_options->fragment_scan_options && + scan_options->fragment_scan_options->type_name() == kCsvTypeName) { auto csv_scan_options = internal::checked_pointer_cast( - scan_context->fragment_scan_options); + scan_options->fragment_scan_options); convert_options = csv_scan_options->convert_options; } @@ -111,7 +110,6 @@ static inline csv::ReadOptions GetReadOptions(const CsvFileFormat& format) { static inline Result> OpenReader( const FileSource& source, const CsvFileFormat& format, const std::shared_ptr& scan_options = nullptr, - const std::shared_ptr& scan_context = nullptr, MemoryPool* pool = default_memory_pool()) { ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); @@ -123,9 +121,8 @@ static inline Result> OpenReader( auto convert_options = csv::ConvertOptions::Defaults(); if (scan_options != nullptr) { - ARROW_ASSIGN_OR_RAISE( - convert_options, - GetConvertOptions(format, scan_options, scan_context, *first_block, pool)); + ARROW_ASSIGN_OR_RAISE(convert_options, + GetConvertOptions(format, scan_options, *first_block, pool)); } auto maybe_reader = @@ -150,8 +147,8 @@ class CsvScanTask : public ScanTask { source_(fragment->source()) {} Result Execute() override { - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source_, *format_, options(), context(), - context()->pool)); + ARROW_ASSIGN_OR_RAISE(auto reader, + OpenReader(source_, *format_, options(), options()->pool)); return IteratorFromReader(std::move(reader)); } diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index aa9a2b186d0..19d792e6bef 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -92,7 +92,7 @@ bar)"); auto fragment_scan_options = std::make_shared(); fragment_scan_options->convert_options.null_values = {"MYNULL"}; fragment_scan_options->convert_options.strings_can_be_null = true; - ctx_->fragment_scan_options = fragment_scan_options; + opts_->fragment_scan_options = fragment_scan_options; int64_t null_count = 0; for (auto maybe_batch : Batches(fragment.get())) { diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index a8a6425b345..441ea0cd4d4 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -108,7 +108,7 @@ class IpcScanTask : public ScanTask { int i_; }; - return Impl::Make(source_, options_->MaterializedFields(), context_->pool); + return Impl::Make(source_, options_->MaterializedFields(), options_->pool); } private: diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index d642ed36dc0..672f90ca24f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -291,7 +291,7 @@ Result> ParquetFileFormat::Inspect( Result> ParquetFileFormat::GetReader( const FileSource& source, ScanOptions* options, ScanContext* context) const { - MemoryPool* pool = context ? context->pool : default_memory_pool(); + MemoryPool* pool = options ? options->pool : default_memory_pool(); auto properties = MakeReaderProperties(*this, pool); ARROW_ASSIGN_OR_RAISE(auto input, source.Open()); @@ -310,7 +310,7 @@ Result> ParquetFileFormat::GetReader arrow_properties.set_batch_size(options->batch_size); } - if (context && !context->use_threads) { + if (options && !options->use_threads) { arrow_properties.set_use_threads(reader_options.enable_parallel_column_conversion); } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 7ce9bdd9a29..3ae0bb51d00 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -47,6 +47,16 @@ std::vector ScanOptions::MaterializedFields() const { return fields; } +using arrow::internal::TaskGroup; + +std::shared_ptr ScanOptions::TaskGroup() const { + if (use_threads) { + auto* thread_pool = arrow::internal::GetCpuThreadPool(); + return TaskGroup::MakeThreaded(thread_pool); + } + return TaskGroup::MakeSerial(); +} + Result InMemoryScanTask::Execute() { return MakeVectorIterator(record_batches_); } @@ -82,6 +92,16 @@ Result ScanTaskIteratorFromRecordBatch( return fragment->Scan(std::move(options), std::move(context)); } +ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, + std::shared_ptr scan_options) + : dataset_(std::move(dataset)), + fragment_(nullptr), + scan_options_(std::move(scan_options)), + scan_context_(std::make_shared()) { + scan_options_->dataset_schema = dataset_->schema(); + DCHECK_OK(Filter(literal(true))); +} + ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, std::shared_ptr scan_context) : dataset_(std::move(dataset)), @@ -121,7 +141,7 @@ Status ScannerBuilder::Filter(const Expression& filter) { } Status ScannerBuilder::UseThreads(bool use_threads) { - scan_context_->use_threads = use_threads; + scan_options_->use_threads = use_threads; return Status::OK(); } @@ -133,6 +153,11 @@ Status ScannerBuilder::BatchSize(int64_t batch_size) { return Status::OK(); } +Status ScannerBuilder::Pool(MemoryPool* pool) { + scan_options_->pool = pool; + return Status::OK(); +} + Result> ScannerBuilder::Finish() { if (!scan_options_->projection.IsBound()) { RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names())); @@ -144,16 +169,6 @@ Result> ScannerBuilder::Finish() { return std::make_shared(dataset_, scan_options_, scan_context_); } -using arrow::internal::TaskGroup; - -std::shared_ptr ScanContext::TaskGroup() const { - if (use_threads) { - auto* thread_pool = arrow::internal::GetCpuThreadPool(); - return TaskGroup::MakeThreaded(thread_pool); - } - return TaskGroup::MakeSerial(); -} - static inline RecordBatchVector FlattenRecordBatchVector( std::vector nested_batches) { RecordBatchVector flattened; @@ -183,7 +198,7 @@ struct TableAssemblyState { Result> Scanner::ToTable() { ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan()); - auto task_group = scan_context_->TaskGroup(); + auto task_group = scan_options_->TaskGroup(); /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't /// invalidate concurrently running tasks when Finish() early returns diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 4af84a4a929..b946d45e3ca 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -39,19 +39,7 @@ namespace dataset { constexpr int64_t kDefaultBatchSize = 1 << 20; /// \brief Shared state for a Scan operation -struct ARROW_DS_EXPORT ScanContext { - /// A pool from which materialized and scanned arrays will be allocated. - MemoryPool* pool = arrow::default_memory_pool(); - - /// Indicate if the Scanner should make use of a ThreadPool. - bool use_threads = false; - - /// Fragment-specific scan options. - std::shared_ptr fragment_scan_options; - - /// Return a threaded or serial TaskGroup according to use_threads. - std::shared_ptr TaskGroup() const; -}; +struct ARROW_DS_EXPORT ScanContext {}; struct ARROW_DS_EXPORT ScanOptions { // Filter and projection @@ -78,6 +66,15 @@ struct ARROW_DS_EXPORT ScanOptions { // Maximum row count for scanned batches. int64_t batch_size = kDefaultBatchSize; + /// A pool from which materialized and scanned arrays will be allocated. + MemoryPool* pool = arrow::default_memory_pool(); + + /// Indicate if the Scanner should make use of a ThreadPool. + bool use_threads = false; + + /// Fragment-specific scan options. + std::shared_ptr fragment_scan_options; + // Return a vector of fields that requires materialization. // // This is usually the union of the fields referenced in the projection and the @@ -93,6 +90,9 @@ struct ARROW_DS_EXPORT ScanOptions { // This is used by Fragment implementations to apply the column // sub-selection optimization. std::vector MaterializedFields() const; + + /// Return a threaded or serial TaskGroup according to use_threads. + std::shared_ptr TaskGroup() const; }; /// \brief Read record batches from a range of a single data fragment. A @@ -200,6 +200,9 @@ class ARROW_DS_EXPORT Scanner { /// columns to materialize. class ARROW_DS_EXPORT ScannerBuilder { public: + ScannerBuilder(std::shared_ptr dataset, + std::shared_ptr scan_options); + ScannerBuilder(std::shared_ptr dataset, std::shared_ptr scan_context); @@ -253,6 +256,9 @@ class ARROW_DS_EXPORT ScannerBuilder { /// This option provides a control limiting the memory owned by any RecordBatch. Status BatchSize(int64_t batch_size); + /// \brief Set the pool from which materialized and scanned arrays will be allocated. + Status Pool(MemoryPool* pool); + /// \brief Return the constructed now-immutable Scanner object Result> Finish(); diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 25c53fc8146..782af187aac 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -100,10 +100,10 @@ class FilterAndProjectScanTask : public ScanTask { SimplifyWithGuarantee(options()->projection, partition_)); RecordBatchIterator filter_it = - FilterRecordBatch(std::move(it), simplified_filter, context_->pool); + FilterRecordBatch(std::move(it), simplified_filter, options_->pool); return ProjectRecordBatch(std::move(filter_it), simplified_projection, - context_->pool); + options_->pool); } private: diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 0ceb5bc4434..a8ed6c3b2b0 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -140,13 +140,13 @@ TEST_F(TestScanner, ToTable) { auto scanner = MakeScanner(batch); std::shared_ptr actual; - ctx_->use_threads = false; + options_->use_threads = false; ASSERT_OK_AND_ASSIGN(actual, scanner.ToTable()); AssertTablesEqual(*expected, *actual); // There is no guarantee on the ordering when using multiple threads, but // since the RecordBatch is always the same it will pass. - ctx_->use_threads = true; + options_->use_threads = true; ASSERT_OK_AND_ASSIGN(actual, scanner.ToTable()); AssertTablesEqual(*expected, *actual); } diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 1c4e5d302c5..d1afb228750 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2163,16 +2163,11 @@ cdef class ScanTask(_Weakrefable): yield pyarrow_wrap_batch(record_batch) -cdef shared_ptr[CScanContext] _build_scan_context(bint use_threads=True, - MemoryPool memory_pool=None): +cdef shared_ptr[CScanContext] _build_scan_context(): cdef: shared_ptr[CScanContext] context context = make_shared[CScanContext]() - context.get().pool = maybe_unbox_memory_pool(memory_pool) - if use_threads is not None: - context.get().use_threads = use_threads - return context @@ -2181,7 +2176,9 @@ _DEFAULT_BATCH_SIZE = 2**20 cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, list columns=None, Expression filter=None, - int batch_size=_DEFAULT_BATCH_SIZE) except *: + int batch_size=_DEFAULT_BATCH_SIZE, + bint use_threads=True, + MemoryPool memory_pool=None) except *: cdef: CScannerBuilder *builder builder = ptr.get() @@ -2193,6 +2190,9 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, check_status(builder.Project([tobytes(c) for c in columns])) check_status(builder.BatchSize(batch_size)) + check_status(builder.UseThreads(use_threads)) + if memory_pool: + check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool))) cdef class Scanner(_Weakrefable): @@ -2261,11 +2261,11 @@ cdef class Scanner(_Weakrefable): shared_ptr[CScannerBuilder] builder shared_ptr[CScanner] scanner - context = _build_scan_context(use_threads=use_threads, - memory_pool=memory_pool) + context = _build_scan_context() builder = make_shared[CScannerBuilder](dataset.unwrap(), context) _populate_builder(builder, columns=columns, filter=filter, - batch_size=batch_size) + batch_size=batch_size, use_threads=use_threads, + memory_pool=memory_pool) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) @@ -2280,15 +2280,15 @@ cdef class Scanner(_Weakrefable): shared_ptr[CScannerBuilder] builder shared_ptr[CScanner] scanner - context = _build_scan_context(use_threads=use_threads, - memory_pool=memory_pool) + context = _build_scan_context() schema = schema or fragment.physical_schema builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema), fragment.unwrap(), context) _populate_builder(builder, columns=columns, filter=filter, - batch_size=batch_size) + batch_size=batch_size, use_threads=use_threads, + memory_pool=memory_pool) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index bbe545cf794..76504c32435 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -60,8 +60,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) cdef cppclass CScanContext "arrow::dataset::ScanContext": - c_bool use_threads - CMemoryPool * pool + pass ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \ "arrow::dataset::ScanTaskIterator" @@ -106,6 +105,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CStatus Project(const vector[c_string]& columns) CStatus Filter(CExpression filter) CStatus UseThreads(c_bool use_threads) + CStatus Pool(CMemoryPool* pool) CStatus BatchSize(int64_t batch_size) CResult[shared_ptr[CScanner]] Finish() shared_ptr[CSchema] schema() const diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 001cd9da0f6..83c7cbb844c 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -66,9 +66,9 @@ const char* r6_class_name::get( // [[dataset::export]] std::shared_ptr dataset___Dataset__NewScan( const std::shared_ptr& ds) { - auto context = std::make_shared(); - context->pool = gc_memory_pool(); - return ValueOrStop(ds->NewScan(std::move(context))); + auto options = std::make_shared(); + options->pool = gc_memory_pool(); + return ValueOrStop(ds->NewScan(std::move(options))); } // [[dataset::export]] From 149cb232613e8095791560f86b00f5f142b4567b Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 12 Mar 2021 14:00:30 -0500 Subject: [PATCH 3/4] ARROW-9749: [C++][Python][Dataset] Remove ScanContext --- cpp/src/arrow/dataset/dataset.cc | 14 ++---- cpp/src/arrow/dataset/dataset.h | 7 +-- cpp/src/arrow/dataset/file_base.cc | 9 ++-- cpp/src/arrow/dataset/file_base.h | 5 +-- cpp/src/arrow/dataset/file_csv.cc | 8 ++-- cpp/src/arrow/dataset/file_csv.h | 2 +- cpp/src/arrow/dataset/file_csv_test.cc | 8 ++-- cpp/src/arrow/dataset/file_ipc.cc | 22 +++------- cpp/src/arrow/dataset/file_ipc.h | 2 +- cpp/src/arrow/dataset/file_ipc_test.cc | 5 +-- cpp/src/arrow/dataset/file_parquet.cc | 12 +++-- cpp/src/arrow/dataset/file_parquet.h | 6 +-- cpp/src/arrow/dataset/file_parquet_test.cc | 9 ++-- cpp/src/arrow/dataset/scanner.cc | 31 +++++-------- cpp/src/arrow/dataset/scanner.h | 46 ++++++-------------- cpp/src/arrow/dataset/scanner_internal.h | 10 ++--- cpp/src/arrow/dataset/scanner_test.cc | 10 ++--- cpp/src/arrow/dataset/test_util.h | 15 +++---- cpp/src/arrow/dataset/type_fwd.h | 1 - cpp/src/jni/dataset/jni_wrapper.cc | 6 +-- python/pyarrow/_dataset.pyx | 19 ++------ python/pyarrow/includes/libarrow_dataset.pxd | 18 +++----- 22 files changed, 91 insertions(+), 174 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 436b891bd74..df155784924 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -69,8 +69,7 @@ InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, : InMemoryFragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(), std::move(record_batches), std::move(partition_expression)) {} -Result InMemoryFragment::Scan(std::shared_ptr options, - std::shared_ptr context) { +Result InMemoryFragment::Scan(std::shared_ptr options) { // Make an explicit copy of record_batches_ to ensure Scan can be called // multiple times. auto batches_it = MakeVectorIterator(record_batches_); @@ -86,8 +85,8 @@ Result InMemoryFragment::Scan(std::shared_ptr opt batches.push_back(batch->Slice(batch_size * i, batch_size)); } - return ::arrow::internal::make_unique( - std::move(batches), std::move(options), std::move(context), self); + return ::arrow::internal::make_unique(std::move(batches), + std::move(options), self); }; return MakeMapIterator(fn, std::move(batches_it)); @@ -102,13 +101,8 @@ Result> Dataset::NewScan( return std::make_shared(this->shared_from_this(), options); } -Result> Dataset::NewScan( - std::shared_ptr context) { - return std::make_shared(this->shared_from_this(), context); -} - Result> Dataset::NewScan() { - return NewScan(std::make_shared()); + return NewScan(std::make_shared()); } Result Dataset::GetFragments() { diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index b5af67e6239..a28b79840d6 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -62,8 +62,7 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { /// columns may be absent if they were not present in this fragment. /// /// To receive a record batch stream which is fully filtered and projected, use Scanner. - virtual Result Scan(std::shared_ptr options, - std::shared_ptr context) = 0; + virtual Result Scan(std::shared_ptr options) = 0; /// \brief Return true if the fragment can benefit from parallel scanning. virtual bool splittable() const = 0; @@ -110,8 +109,7 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { Expression = literal(true)); explicit InMemoryFragment(RecordBatchVector record_batches, Expression = literal(true)); - Result Scan(std::shared_ptr options, - std::shared_ptr context) override; + Result Scan(std::shared_ptr options) override; bool splittable() const override { return false; } @@ -132,7 +130,6 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: /// \brief Begin to build a new Scan operation against this Dataset Result> NewScan(std::shared_ptr options); - Result> NewScan(std::shared_ptr context); Result> NewScan(); /// \brief GetFragments returns an iterator of Fragments given a predicate. diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index ba6cd901305..eff3f5231d8 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -78,10 +78,9 @@ Result> FileFragment::ReadPhysicalSchemaImpl() { return format_->Inspect(source_); } -Result FileFragment::Scan(std::shared_ptr options, - std::shared_ptr context) { +Result FileFragment::Scan(std::shared_ptr options) { auto self = std::dynamic_pointer_cast(shared_from_this()); - return format_->ScanFile(std::move(options), std::move(context), self); + return format_->ScanFile(std::move(options), self); } struct FileSystemDataset::FragmentSubtrees { @@ -356,14 +355,12 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector()); ScanTaskVector scan_tasks; - auto context = std::make_shared(*scanner->context()); - for (const auto& fragment : fragments) { auto options = std::make_shared(*scanner->options()); // Avoid contention with multithreaded readers options->use_threads = false; ARROW_ASSIGN_OR_RAISE(auto scan_task_it, - Scanner(fragment, std::move(options), context).Scan()); + Scanner(fragment, std::move(options)).Scan()); for (auto maybe_scan_task : scan_task_it) { ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); scan_tasks.push_back(std::move(scan_task)); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 0d1dd9cd0bf..4b19f3083eb 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -137,7 +137,7 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& file) const = 0; /// \brief Open a fragment @@ -161,8 +161,7 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this Scan(std::shared_ptr options, - std::shared_ptr context) override; + Result Scan(std::shared_ptr options) override; std::string type_name() const override { return format_->type_name(); } std::string ToString() const override { return source_.path(); }; diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index d8d9439eba5..87d97ea7d40 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -140,9 +140,9 @@ static inline Result> OpenReader( class CsvScanTask : public ScanTask { public: CsvScanTask(std::shared_ptr format, - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, std::shared_ptr fragment) - : ScanTask(std::move(options), std::move(context), fragment), + : ScanTask(std::move(options), fragment), format_(std::move(format)), source_(fragment->source()) {} @@ -184,11 +184,11 @@ Result> CsvFileFormat::Inspect(const FileSource& source) } Result CsvFileFormat::ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& fragment) const { auto this_ = checked_pointer_cast(shared_from_this()); auto task = std::make_shared(std::move(this_), std::move(options), - std::move(context), std::move(fragment)); + std::move(fragment)); return MakeVectorIterator>({std::move(task)}); } diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index b7a9388a324..e93ae1fa6ad 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -49,7 +49,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { /// \brief Open a file for scanning Result ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& fragment) const override; Result> MakeWriter( diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 19d792e6bef..7399ae71ec0 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -47,19 +47,17 @@ class TestCsvFileFormat : public testing::Test { } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); return Batches(std::move(scan_task_it)); } void SetSchema(std::vector> fields) { - opts_ = std::make_shared(); opts_->dataset_schema = schema(std::move(fields)); ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names())); } std::shared_ptr format_ = std::make_shared(); - std::shared_ptr opts_; - std::shared_ptr ctx_ = std::make_shared(); + std::shared_ptr opts_ = std::make_shared(); }; TEST_F(TestCsvFileFormat, ScanRecordBatchReader) { @@ -188,7 +186,7 @@ N/A,bar auto dataset_schema = schema({field("betrayal_not_really_f64", not_float64), field("str", utf8())}); - ScannerBuilder builder(dataset_schema, fragment, ctx_); + ScannerBuilder builder(dataset_schema, fragment, opts_); // This filter is valid with declared schema, but would *not* be valid // if betrayal_not_really_f64 were read as double rather than string. diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 441ea0cd4d4..a81e8b74e86 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -76,9 +76,8 @@ static inline Result> GetIncludedFields( class IpcScanTask : public ScanTask { public: IpcScanTask(std::shared_ptr fragment, - std::shared_ptr options, std::shared_ptr context) - : ScanTask(std::move(options), std::move(context), fragment), - source_(fragment->source()) {} + std::shared_ptr options) + : ScanTask(std::move(options), fragment), source_(fragment->source()) {} Result Execute() override { struct Impl { @@ -118,10 +117,8 @@ class IpcScanTask : public ScanTask { class IpcScanTaskIterator { public: static Result Make(std::shared_ptr options, - std::shared_ptr context, std::shared_ptr fragment) { - return ScanTaskIterator( - IpcScanTaskIterator(std::move(options), std::move(context), std::move(fragment))); + return ScanTaskIterator(IpcScanTaskIterator(std::move(options), std::move(fragment))); } Result> Next() { @@ -131,20 +128,16 @@ class IpcScanTaskIterator { } once_ = true; - return std::shared_ptr(new IpcScanTask(fragment_, options_, context_)); + return std::shared_ptr(new IpcScanTask(fragment_, options_)); } private: IpcScanTaskIterator(std::shared_ptr options, - std::shared_ptr context, std::shared_ptr fragment) - : options_(std::move(options)), - context_(std::move(context)), - fragment_(std::move(fragment)) {} + : options_(std::move(options)), fragment_(std::move(fragment)) {} bool once_ = false; std::shared_ptr options_; - std::shared_ptr context_; std::shared_ptr fragment_; }; @@ -159,10 +152,9 @@ Result> IpcFileFormat::Inspect(const FileSource& source) } Result IpcFileFormat::ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& fragment) const { - return IpcScanTaskIterator::Make(std::move(options), std::move(context), - std::move(fragment)); + return IpcScanTaskIterator::Make(std::move(options), std::move(fragment)); } // diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index c0e311ae3e7..cbfb6b858cd 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -49,7 +49,7 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { /// \brief Open a file for scanning Result ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& fragment) const override; Result> MakeWriter( diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index e5347fa0cda..8a5fd024575 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -102,7 +102,7 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin { } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); return Batches(std::move(scan_task_it)); } @@ -115,7 +115,6 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin { protected: std::shared_ptr format_ = std::make_shared(); std::shared_ptr opts_; - std::shared_ptr ctx_ = std::make_shared(); }; TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { @@ -238,7 +237,7 @@ TEST_F(TestIpcFileSystemDataset, WriteExceedsMaxPartitions) { // require that no batch be grouped into more than 2 written batches: write_options_.max_partitions = 2; - auto scanner = std::make_shared(dataset_, scan_options_, scan_context_); + auto scanner = std::make_shared(dataset_, scan_options_); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("This exceeds the maximum"), FileSystemDataset::Write(write_options_, scanner)); } diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 672f90ca24f..d255787d55f 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -60,9 +60,8 @@ class ParquetScanTask : public ScanTask { std::vector pre_buffer_row_groups, arrow::io::IOContext io_context, arrow::io::CacheOptions cache_options, std::shared_ptr options, - std::shared_ptr context, std::shared_ptr fragment) - : ScanTask(std::move(options), std::move(context), std::move(fragment)), + : ScanTask(std::move(options), std::move(fragment)), row_group_(row_group), column_projection_(std::move(column_projection)), reader_(std::move(reader)), @@ -290,7 +289,7 @@ Result> ParquetFileFormat::Inspect( } Result> ParquetFileFormat::GetReader( - const FileSource& source, ScanOptions* options, ScanContext* context) const { + const FileSource& source, ScanOptions* options) const { MemoryPool* pool = options ? options->pool : default_memory_pool(); auto properties = MakeReaderProperties(*this, pool); @@ -321,7 +320,7 @@ Result> ParquetFileFormat::GetReader } Result ParquetFileFormat::ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& fragment) const { auto* parquet_fragment = checked_cast(fragment.get()); std::vector row_groups; @@ -342,7 +341,7 @@ Result ParquetFileFormat::ScanFile( // Open the reader and pay the real IO cost. ARROW_ASSIGN_OR_RAISE(std::shared_ptr reader, - GetReader(fragment->source(), options.get(), context.get())); + GetReader(fragment->source(), options.get())); // Ensure that parquet_fragment has FileMetaData RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get())); @@ -365,8 +364,7 @@ Result ParquetFileFormat::ScanFile( for (size_t i = 0; i < row_groups.size(); ++i) { tasks[i] = std::make_shared( row_groups[i], column_projection, reader, pre_buffer_once, row_groups, - reader_options.io_context, reader_options.cache_options, options, context, - fragment); + reader_options.io_context, reader_options.cache_options, options, fragment); } return MakeVectorIterator(std::move(tasks)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index e72984ebdb5..869857e4d34 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -77,7 +77,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// members of parquet::ReaderProperties. /// /// We don't embed parquet::ReaderProperties directly because we get memory_pool from - /// ScanContext at scan time and provide differing defaults. + /// ScanOptions at scan time and provide differing defaults. /// /// @{ bool use_buffered_stream = false; @@ -114,7 +114,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// \brief Open a file for scanning Result ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& file) const override; using FileFormat::MakeFragment; @@ -131,7 +131,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// \brief Return a FileReader on the given source. Result> GetReader( - const FileSource& source, ScanOptions* = NULLPTR, ScanContext* = NULLPTR) const; + const FileSource& source, ScanOptions* = NULLPTR) const; Result> MakeWriter( std::shared_ptr destination, std::shared_ptr schema, diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 977af6be81a..cf14a2b7caf 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -157,7 +157,7 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); return Batches(std::move(scan_task_it)); } @@ -217,7 +217,6 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { protected: std::shared_ptr format_ = std::make_shared(); std::shared_ptr opts_; - std::shared_ptr ctx_ = std::make_shared(); }; TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { @@ -248,7 +247,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { format_->reader_options.dict_columns = {"utf8"}; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); + ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); int64_t row_count = 0; Schema expected_schema({field("utf8", dictionary(int32(), utf8()))}); @@ -275,7 +274,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderPreBuffer) { format_->reader_options.pre_buffer = true; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); + ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); int64_t task_count = 0; int64_t row_count = 0; @@ -594,7 +593,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { EXPECT_RAISES_WITH_MESSAGE_THAT( IndexError, testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"), - row_groups_fragment({kNumRowGroups + 1})->Scan(opts_, ctx_)); + row_groups_fragment({kNumRowGroups + 1})->Scan(opts_)); } TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 3ae0bb51d00..1aca9fa4882 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -77,48 +77,39 @@ Result Scanner::Scan() { // Iterator. The first Iterator::Next invocation is going to do // all the work of unwinding the chained iterators. ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments()); - return GetScanTaskIterator(std::move(fragment_it), scan_options_, scan_context_); + return GetScanTaskIterator(std::move(fragment_it), scan_options_); } Result ScanTaskIteratorFromRecordBatch( std::vector> batches, - std::shared_ptr options, std::shared_ptr context) { + std::shared_ptr options) { if (batches.empty()) { return MakeVectorIterator(ScanTaskVector()); } auto schema = batches[0]->schema(); auto fragment = std::make_shared(std::move(schema), std::move(batches)); - return fragment->Scan(std::move(options), std::move(context)); + return fragment->Scan(std::move(options)); } -ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_options) - : dataset_(std::move(dataset)), - fragment_(nullptr), - scan_options_(std::move(scan_options)), - scan_context_(std::make_shared()) { - scan_options_->dataset_schema = dataset_->schema(); - DCHECK_OK(Filter(literal(true))); -} +ScannerBuilder::ScannerBuilder(std::shared_ptr dataset) + : ScannerBuilder(std::move(dataset), std::make_shared()) {} ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_context) + std::shared_ptr scan_options) : dataset_(std::move(dataset)), fragment_(nullptr), - scan_options_(std::make_shared()), - scan_context_(std::move(scan_context)) { + scan_options_(std::move(scan_options)) { scan_options_->dataset_schema = dataset_->schema(); DCHECK_OK(Filter(literal(true))); } ScannerBuilder::ScannerBuilder(std::shared_ptr schema, std::shared_ptr fragment, - std::shared_ptr scan_context) + std::shared_ptr scan_options) : dataset_(nullptr), fragment_(std::move(fragment)), - scan_options_(std::make_shared()), - scan_context_(std::move(scan_context)) { + scan_options_(std::move(scan_options)) { scan_options_->dataset_schema = std::move(schema); DCHECK_OK(Filter(literal(true))); } @@ -164,9 +155,9 @@ Result> ScannerBuilder::Finish() { } if (dataset_ == nullptr) { - return std::make_shared(fragment_, scan_options_, scan_context_); + return std::make_shared(fragment_, scan_options_); } - return std::make_shared(dataset_, scan_options_, scan_context_); + return std::make_shared(dataset_, scan_options_); } static inline RecordBatchVector FlattenRecordBatchVector( diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index b946d45e3ca..6e06af06066 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -38,9 +38,6 @@ namespace dataset { constexpr int64_t kDefaultBatchSize = 1 << 20; -/// \brief Shared state for a Scan operation -struct ARROW_DS_EXPORT ScanContext {}; - struct ARROW_DS_EXPORT ScanOptions { // Filter and projection Expression filter = literal(true); @@ -108,18 +105,13 @@ class ARROW_DS_EXPORT ScanTask { virtual ~ScanTask() = default; const std::shared_ptr& options() const { return options_; } - const std::shared_ptr& context() const { return context_; } const std::shared_ptr& fragment() const { return fragment_; } protected: - ScanTask(std::shared_ptr options, std::shared_ptr context, - std::shared_ptr fragment) - : options_(std::move(options)), - context_(std::move(context)), - fragment_(std::move(fragment)) {} + ScanTask(std::shared_ptr options, std::shared_ptr fragment) + : options_(std::move(options)), fragment_(std::move(fragment)) {} std::shared_ptr options_; - std::shared_ptr context_; std::shared_ptr fragment_; }; @@ -128,9 +120,8 @@ class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask { public: InMemoryScanTask(std::vector> record_batches, std::shared_ptr options, - std::shared_ptr context, std::shared_ptr fragment) - : ScanTask(std::move(options), std::move(context), std::move(fragment)), + : ScanTask(std::move(options), std::move(fragment)), record_batches_(std::move(record_batches)) {} Result Execute() override; @@ -141,7 +132,7 @@ class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask { ARROW_DS_EXPORT Result ScanTaskIteratorFromRecordBatch( std::vector> batches, - std::shared_ptr options, std::shared_ptr); + std::shared_ptr options); /// \brief Scanner is a materialized scan operation with context and options /// bound. A scanner is the class that glues ScanTask, Fragment, @@ -153,17 +144,11 @@ ARROW_DS_EXPORT Result ScanTaskIteratorFromRecordBatch( /// yield scan_task class ARROW_DS_EXPORT Scanner { public: - Scanner(std::shared_ptr dataset, std::shared_ptr scan_options, - std::shared_ptr scan_context) - : dataset_(std::move(dataset)), - scan_options_(std::move(scan_options)), - scan_context_(std::move(scan_context)) {} - - Scanner(std::shared_ptr fragment, std::shared_ptr scan_options, - std::shared_ptr scan_context) - : fragment_(std::move(fragment)), - scan_options_(std::move(scan_options)), - scan_context_(std::move(scan_context)) {} + Scanner(std::shared_ptr dataset, std::shared_ptr scan_options) + : dataset_(std::move(dataset)), scan_options_(std::move(scan_options)) {} + + Scanner(std::shared_ptr fragment, std::shared_ptr scan_options) + : fragment_(std::move(fragment)), scan_options_(std::move(scan_options)) {} /// \brief The Scan operator returns a stream of ScanTask. The caller is /// responsible to dispatch/schedule said tasks. Tasks should be safe to run @@ -185,14 +170,11 @@ class ARROW_DS_EXPORT Scanner { const std::shared_ptr& options() const { return scan_options_; } - const std::shared_ptr& context() const { return scan_context_; } - protected: std::shared_ptr dataset_; // TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments std::shared_ptr fragment_; std::shared_ptr scan_options_; - std::shared_ptr scan_context_; }; /// \brief ScannerBuilder is a factory class to construct a Scanner. It is used @@ -200,14 +182,13 @@ class ARROW_DS_EXPORT Scanner { /// columns to materialize. class ARROW_DS_EXPORT ScannerBuilder { public: - ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_options); + explicit ScannerBuilder(std::shared_ptr dataset); ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_context); + std::shared_ptr scan_options); ScannerBuilder(std::shared_ptr schema, std::shared_ptr fragment, - std::shared_ptr scan_context); + std::shared_ptr scan_options); /// \brief Set the subset of columns to materialize. /// @@ -245,7 +226,7 @@ class ARROW_DS_EXPORT ScannerBuilder { Status Filter(const Expression& filter); /// \brief Indicate if the Scanner should make use of the available - /// ThreadPool found in ScanContext; + /// ThreadPool found in ScanOptions; Status UseThreads(bool use_threads = true); /// \brief Set the maximum number of rows per RecordBatch. @@ -268,7 +249,6 @@ class ARROW_DS_EXPORT ScannerBuilder { std::shared_ptr dataset_; std::shared_ptr fragment_; std::shared_ptr scan_options_; - std::shared_ptr scan_context_; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 782af187aac..e666d251cd1 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -86,7 +86,7 @@ inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, class FilterAndProjectScanTask : public ScanTask { public: explicit FilterAndProjectScanTask(std::shared_ptr task, Expression partition) - : ScanTask(task->options(), task->context(), task->fragment()), + : ScanTask(task->options(), task->fragment()), task_(std::move(task)), partition_(std::move(partition)) {} @@ -114,12 +114,10 @@ class FilterAndProjectScanTask : public ScanTask { /// \brief GetScanTaskIterator transforms an Iterator in a /// flattened Iterator. inline Result GetScanTaskIterator( - FragmentIterator fragments, std::shared_ptr options, - std::shared_ptr context) { + FragmentIterator fragments, std::shared_ptr options) { // Fragment -> ScanTaskIterator - auto fn = [options, - context](std::shared_ptr fragment) -> Result { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options, context)); + auto fn = [options](std::shared_ptr fragment) -> Result { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options)); auto partition = fragment->partition_expression(); // Apply the filter and/or projection to incoming RecordBatches by diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index a8ed6c3b2b0..66b1edff568 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -47,7 +47,7 @@ class TestScanner : public DatasetFixtureMixin { EXPECT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(batch->schema(), children)); - return Scanner{dataset, options_, ctx_}; + return Scanner{dataset, options_}; } void AssertScannerEqualsRepetitionsOf( @@ -123,7 +123,7 @@ TEST_F(TestScanner, MaterializeMissingColumn) { auto batch_with_f64 = RecordBatch::Make(schema_, f64->length(), {batch_missing_f64->column(0), f64}); - ScannerBuilder builder{schema_, fragment_missing_f64, ctx_}; + ScannerBuilder builder{schema_, fragment_missing_f64, options_}; ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); AssertScannerEqualsRepetitionsOf(*scanner, batch_with_f64); @@ -167,13 +167,13 @@ class TestScannerBuilder : public ::testing::Test { } protected: - std::shared_ptr ctx_ = std::make_shared(); + std::shared_ptr options_ = std::make_shared(); std::shared_ptr schema_; std::shared_ptr dataset_; }; TEST_F(TestScannerBuilder, TestProject) { - ScannerBuilder builder(dataset_, ctx_); + ScannerBuilder builder(dataset_, options_); // It is valid to request no columns, e.g. `SELECT 1 FROM t WHERE t.a > 0`. // still needs to touch the `a` column. @@ -200,7 +200,7 @@ TEST_F(TestScannerBuilder, TestProject) { } TEST_F(TestScannerBuilder, TestFilter) { - ScannerBuilder builder(dataset_, ctx_); + ScannerBuilder builder(dataset_, options_); ASSERT_OK(builder.Filter(literal(true))); ASSERT_OK(builder.Filter(equal(field_ref("i64"), literal(10)))); diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 248877e882e..6a4c1eb8d13 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -140,7 +140,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_, ctx_)); + ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_)); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); @@ -213,7 +213,6 @@ class DatasetFixtureMixin : public ::testing::Test { std::shared_ptr schema_; std::shared_ptr options_; - std::shared_ptr ctx_ = std::make_shared(); }; /// \brief A dummy FileFormat implementation @@ -237,7 +236,7 @@ class DummyFileFormat : public FileFormat { /// \brief Open a file for scanning (always returns an empty iterator) Result ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& fragment) const override { return MakeEmptyIterator>(); } @@ -277,7 +276,7 @@ class JSONRecordBatchFileFormat : public FileFormat { /// \brief Open a file for scanning Result ScanFile( - std::shared_ptr options, std::shared_ptr context, + std::shared_ptr options, const std::shared_ptr& fragment) const override { ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open()); ARROW_ASSIGN_OR_RAISE(int64_t size, file->GetSize()); @@ -287,8 +286,7 @@ class JSONRecordBatchFileFormat : public FileFormat { ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(fragment->source())); std::shared_ptr batch = RecordBatchFromJSON(schema, view); - return ScanTaskIteratorFromRecordBatch({batch}, std::move(options), - std::move(context)); + return ScanTaskIteratorFromRecordBatch({batch}, std::move(options)); } Result> MakeWriter( @@ -585,7 +583,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { void DoWrite(std::shared_ptr desired_partitioning) { write_options_.partitioning = desired_partitioning; - auto scanner = std::make_shared(dataset_, scan_options_, scan_context_); + auto scanner = std::make_shared(dataset_, scan_options_); ASSERT_OK(FileSystemDataset::Write(write_options_, scanner)); // re-discover the written dataset @@ -753,7 +751,7 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { } ASSERT_OK_AND_ASSIGN(auto scanner, ScannerBuilder(actual_physical_schema, fragment, - std::make_shared()) + std::make_shared()) .Finish()); ASSERT_OK_AND_ASSIGN(auto actual_table, scanner->ToTable()); ASSERT_OK_AND_ASSIGN(actual_table, actual_table->CombineChunks()); @@ -780,7 +778,6 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { std::shared_ptr written_; FileSystemDatasetWriteOptions write_options_; std::shared_ptr scan_options_; - std::shared_ptr scan_context_ = std::make_shared(); }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 9534de576e7..62395ad1a6e 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -76,7 +76,6 @@ class PartitioningOrFactory; class DirectoryPartitioning; class HivePartitioning; -struct ScanContext; struct ScanOptions; class Scanner; diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index 3003888fc1c..fe09dc44eca 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -431,13 +431,11 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann if (pool == nullptr) { JniThrow("Memory pool does not exist or has been closed"); } - std::shared_ptr context = - std::make_shared(); - context->pool = pool; std::shared_ptr dataset = RetrieveNativeInstance(dataset_id); std::shared_ptr scanner_builder = - JniGetOrThrow(dataset->NewScan(context)); + JniGetOrThrow(dataset->NewScan()); + JniAssertOkOrThrow(scanner_builder->Pool(pool)); std::vector column_vector = ToStringVector(env, columns); if (!column_vector.empty()) { diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index d1afb228750..79350d722bf 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -2163,14 +2163,6 @@ cdef class ScanTask(_Weakrefable): yield pyarrow_wrap_batch(record_batch) -cdef shared_ptr[CScanContext] _build_scan_context(): - cdef: - shared_ptr[CScanContext] context - - context = make_shared[CScanContext]() - return context - - _DEFAULT_BATCH_SIZE = 2**20 @@ -2257,12 +2249,11 @@ cdef class Scanner(_Weakrefable): list columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE): cdef: - shared_ptr[CScanContext] context + shared_ptr[CScanOptions] options = make_shared[CScanOptions]() shared_ptr[CScannerBuilder] builder shared_ptr[CScanner] scanner - context = _build_scan_context() - builder = make_shared[CScannerBuilder](dataset.unwrap(), context) + builder = make_shared[CScannerBuilder](dataset.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, memory_pool=memory_pool) @@ -2276,16 +2267,14 @@ cdef class Scanner(_Weakrefable): list columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE): cdef: - shared_ptr[CScanContext] context + shared_ptr[CScanOptions] options = make_shared[CScanOptions]() shared_ptr[CScannerBuilder] builder shared_ptr[CScanner] scanner - context = _build_scan_context() - schema = schema or fragment.physical_schema builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema), - fragment.unwrap(), context) + fragment.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, memory_pool=memory_pool) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 76504c32435..40da2d33d02 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -59,9 +59,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) - cdef cppclass CScanContext "arrow::dataset::ScanContext": - pass - ctypedef CIterator[shared_ptr[CScanTask]] CScanTaskIterator \ "arrow::dataset::ScanTaskIterator" @@ -70,8 +67,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CFragment "arrow::dataset::Fragment": CResult[shared_ptr[CSchema]] ReadPhysicalSchema() - CResult[CScanTaskIterator] Scan( - shared_ptr[CScanOptions] options, shared_ptr[CScanContext] context) + CResult[CScanTaskIterator] Scan(shared_ptr[CScanOptions] options) c_bool splittable() const c_string type_name() const const CExpression& partition_expression() const @@ -88,10 +84,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CExpression partition_expression) cdef cppclass CScanner "arrow::dataset::Scanner": - CScanner(shared_ptr[CDataset], shared_ptr[CScanOptions], - shared_ptr[CScanContext]) - CScanner(shared_ptr[CFragment], shared_ptr[CScanOptions], - shared_ptr[CScanContext]) + CScanner(shared_ptr[CDataset], shared_ptr[CScanOptions]) + CScanner(shared_ptr[CFragment], shared_ptr[CScanOptions]) CResult[CScanTaskIterator] Scan() CResult[shared_ptr[CTable]] ToTable() CResult[CFragmentIterator] GetFragments() @@ -99,9 +93,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CScannerBuilder "arrow::dataset::ScannerBuilder": CScannerBuilder(shared_ptr[CDataset], - shared_ptr[CScanContext] scan_context) + shared_ptr[CScanOptions] scan_options) CScannerBuilder(shared_ptr[CSchema], shared_ptr[CFragment], - shared_ptr[CScanContext] scan_context) + shared_ptr[CScanOptions] scan_options) CStatus Project(const vector[c_string]& columns) CStatus Filter(CExpression filter) CStatus UseThreads(c_bool use_threads) @@ -122,8 +116,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CDataset]] ReplaceSchema(shared_ptr[CSchema]) - CResult[shared_ptr[CScannerBuilder]] NewScanWithContext "NewScan"( - shared_ptr[CScanContext] context) CResult[shared_ptr[CScannerBuilder]] NewScan() cdef cppclass CUnionDataset "arrow::dataset::UnionDataset"( From 5897a97dcc5ae378cf2f026768c5f45fec18b092 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 12 Mar 2021 15:30:26 -0500 Subject: [PATCH 4/4] ARROW-9749: [GLib][Ruby] Remove ScanContext uses --- c_glib/arrow-dataset-glib/scanner.cpp | 221 ++---------------- c_glib/arrow-dataset-glib/scanner.h | 19 -- c_glib/arrow-dataset-glib/scanner.hpp | 6 - .../test/dataset/test-in-memory-scan-task.rb | 7 - c_glib/test/dataset/test-scan-context.rb | 33 --- c_glib/test/dataset/test-scan-options.rb | 10 + .../lib/arrow-dataset/in-memory-scan-task.rb | 3 +- 7 files changed, 32 insertions(+), 267 deletions(-) delete mode 100644 c_glib/test/dataset/test-scan-context.rb diff --git a/c_glib/arrow-dataset-glib/scanner.cpp b/c_glib/arrow-dataset-glib/scanner.cpp index 84d037b154f..4256dece2f8 100644 --- a/c_glib/arrow-dataset-glib/scanner.cpp +++ b/c_glib/arrow-dataset-glib/scanner.cpp @@ -38,8 +38,6 @@ G_BEGIN_DECLS * @title: Scanner classes * @include: arrow-dataset-glib/arrow-dataset-glib.h * - * #GADScanContext is a class for a scan context. - * * #GADScanOptions is a class for a set of scan options. * * #GADScanTask is an abstract class for a scan task. @@ -49,131 +47,6 @@ G_BEGIN_DECLS * Since: 1.0.0 */ -/* arrow::dataset::ScanContext */ - -typedef struct GADScanContextPrivate_ { - std::shared_ptr scan_context; -} GADScanContextPrivate; - -enum { - PROP_SCAN_CONTEXT = 1, - PROP_USE_THREADS, -}; - -G_DEFINE_TYPE_WITH_PRIVATE(GADScanContext, - gad_scan_context, - G_TYPE_OBJECT) - -#define GAD_SCAN_CONTEXT_GET_PRIVATE(obj) \ - static_cast( \ - gad_scan_context_get_instance_private( \ - GAD_SCAN_CONTEXT(obj))) - -static void -gad_scan_context_finalize(GObject *object) -{ - auto priv = GAD_SCAN_CONTEXT_GET_PRIVATE(object); - - priv->scan_context.~shared_ptr(); - - G_OBJECT_CLASS(gad_scan_context_parent_class)->finalize(object); -} - -static void -gad_scan_context_set_property(GObject *object, - guint prop_id, - const GValue *value, - GParamSpec *pspec) -{ - auto priv = GAD_SCAN_CONTEXT_GET_PRIVATE(object); - - switch (prop_id) { - case PROP_SCAN_CONTEXT: - priv->scan_context = - *static_cast *>(g_value_get_pointer(value)); - break; - case PROP_USE_THREADS: - priv->scan_context->use_threads = g_value_get_boolean(value); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); - break; - } -} - -static void -gad_scan_context_get_property(GObject *object, - guint prop_id, - GValue *value, - GParamSpec *pspec) -{ - auto priv = GAD_SCAN_CONTEXT_GET_PRIVATE(object); - - switch (prop_id) { - case PROP_USE_THREADS: - g_value_set_boolean(value, priv->scan_context->use_threads); - break; - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); - break; - } -} - -static void -gad_scan_context_init(GADScanContext *object) -{ - auto priv = GAD_SCAN_CONTEXT_GET_PRIVATE(object); - new(&priv->scan_context) std::shared_ptr; -} - -static void -gad_scan_context_class_init(GADScanContextClass *klass) -{ - auto gobject_class = G_OBJECT_CLASS(klass); - - gobject_class->finalize = gad_scan_context_finalize; - gobject_class->set_property = gad_scan_context_set_property; - gobject_class->get_property = gad_scan_context_get_property; - - auto scan_context = arrow::dataset::ScanContext(); - - GParamSpec *spec; - spec = g_param_spec_pointer("scan-context", - "ScanContext", - "The raw std::shared *", - static_cast(G_PARAM_WRITABLE | - G_PARAM_CONSTRUCT_ONLY)); - g_object_class_install_property(gobject_class, PROP_SCAN_CONTEXT, spec); - - /** - * GADScanContext:use-threads: - * - * Indicate if the Scanner should make use of a ThreadPool. - * - * Since: 1.0.0 - */ - spec = g_param_spec_boolean("use-threads", - "Use threads", - "Indicate if the Scanner should make use of a ThreadPool", - scan_context.use_threads, - static_cast(G_PARAM_READWRITE)); - g_object_class_install_property(gobject_class, PROP_USE_THREADS, spec); -} - -/** - * gad_scan_context_new: - * - * Returns: A newly created #GADScanContext. - * - * Since: 1.0.0 - */ -GADScanContext * -gad_scan_context_new(void) -{ - auto arrow_scan_context = std::make_shared(); - return gad_scan_context_new_raw(&arrow_scan_context); -} - /* arrow::dataset::ScanOptions */ typedef struct GADScanOptionsPrivate_ { @@ -186,6 +59,7 @@ enum { PROP_EVALUATOR, PROP_PROJECTOR, PROP_BATCH_SIZE, + PROP_USE_THREADS, }; G_DEFINE_TYPE_WITH_PRIVATE(GADScanOptions, @@ -223,6 +97,9 @@ gad_scan_options_set_property(GObject *object, case PROP_BATCH_SIZE: priv->scan_options->batch_size = g_value_get_int64(value); break; + case PROP_USE_THREADS: + priv->scan_options->use_threads = g_value_get_boolean(value); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -241,6 +118,9 @@ gad_scan_options_get_property(GObject *object, case PROP_BATCH_SIZE: g_value_set_int64(value, priv->scan_options->batch_size); break; + case PROP_USE_THREADS: + g_value_set_boolean(value, priv->scan_options->use_threads); + break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -294,6 +174,20 @@ gad_scan_options_class_init(GADScanOptionsClass *klass) scan_options->batch_size, static_cast(G_PARAM_READWRITE)); g_object_class_install_property(gobject_class, PROP_BATCH_SIZE, spec); + + /** + * GADScanOptions:use-threads: + * + * Indicate if the Scanner should make use of a ThreadPool. + * + * Since: 4.0.0 + */ + spec = g_param_spec_boolean("use-threads", + "Use threads", + "Indicate if the Scanner should make use of a ThreadPool", + scan_options->use_threads, + static_cast(G_PARAM_READWRITE)); + g_object_class_install_property(gobject_class, PROP_USE_THREADS, spec); } /** @@ -334,14 +228,12 @@ gad_scan_options_get_schema(GADScanOptions *scan_options) typedef struct GADScanTaskPrivate_ { std::shared_ptr scan_task; GADScanOptions *options; - GADScanContext *context; GADFragment *fragment; } GADScanTaskPrivate; enum { PROP_SCAN_TASK = 1, PROP_OPTIONS, - PROP_CONTEXT, PROP_FRAGMENT, }; @@ -364,11 +256,6 @@ gad_scan_task_dispose(GObject *object) priv->options = NULL; } - if (priv->context) { - g_object_unref(priv->context); - priv->context = NULL; - } - if (priv->fragment) { g_object_unref(priv->fragment); priv->fragment = NULL; @@ -403,9 +290,6 @@ gad_scan_task_set_property(GObject *object, case PROP_OPTIONS: priv->options = GAD_SCAN_OPTIONS(g_value_dup_object(value)); break; - case PROP_CONTEXT: - priv->context = GAD_SCAN_CONTEXT(g_value_dup_object(value)); - break; case PROP_FRAGMENT: priv->fragment = GAD_FRAGMENT(g_value_dup_object(value)); break; @@ -427,9 +311,6 @@ gad_scan_task_get_property(GObject *object, case PROP_OPTIONS: g_value_set_object(value, priv->options); break; - case PROP_CONTEXT: - g_value_set_object(value, priv->context); - break; case PROP_FRAGMENT: g_value_set_object(value, priv->fragment); break; @@ -479,21 +360,6 @@ gad_scan_task_class_init(GADScanTaskClass *klass) G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_OPTIONS, spec); - /** - * GADScanTask:context: - * - * The context of the scan task. - * - * Since: 1.0.0 - */ - spec = g_param_spec_object("context", - "Context", - "The context of the scan task", - GAD_TYPE_SCAN_CONTEXT, - static_cast(G_PARAM_READWRITE | - G_PARAM_CONSTRUCT_ONLY)); - g_object_class_install_property(gobject_class, PROP_CONTEXT, spec); - /** * GADScanTask:fragment: * @@ -531,27 +397,6 @@ gad_scan_task_get_options(GADScanTask *scan_task) return gad_scan_options_new_raw(&arrow_options); } -/** - * gad_scan_task_get_context: - * @scan_task: A #GADScanTask. - * - * Returns: (transfer full): A #GADScanContext. - * - * Since: 1.0.0 - */ -GADScanContext * -gad_scan_task_get_context(GADScanTask *scan_task) -{ - auto priv = GAD_SCAN_TASK_GET_PRIVATE(scan_task); - if (priv->context) { - g_object_ref(priv->context); - return priv->context; - } - - auto arrow_context = priv->scan_task->context(); - return gad_scan_context_new_raw(&arrow_context); -} - /** * gad_scan_task_get_fragment: * @scan_task: A #GADFragment. @@ -618,7 +463,6 @@ gad_in_memory_scan_task_class_init(GADInMemoryScanTaskClass *klass) * (element-type GArrowRecordBatch): The record batches of the table. * @n_record_batches: The number of record batches. * @options: A #GADScanOptions. - * @context: A #GADScanContext. * @fragment: A #GADInMemoryFragment. * * Returns: A newly created #GADInMemoryScanTask. @@ -629,7 +473,6 @@ GADInMemoryScanTask * gad_in_memory_scan_task_new(GArrowRecordBatch **record_batches, gsize n_record_batches, GADScanOptions *options, - GADScanContext *context, GADInMemoryFragment *fragment) { std::vector> arrow_record_batches; @@ -639,38 +482,18 @@ gad_in_memory_scan_task_new(GArrowRecordBatch **record_batches, arrow_record_batches.push_back(arrow_record_batch); } auto arrow_options = gad_scan_options_get_raw(options); - auto arrow_context = gad_scan_context_get_raw(context); auto arrow_fragment = gad_fragment_get_raw(GAD_FRAGMENT(fragment)); auto arrow_in_memory_scan_task = std::make_shared(arrow_record_batches, arrow_options, - arrow_context, arrow_fragment); return gad_in_memory_scan_task_new_raw(&arrow_in_memory_scan_task, options, - context, fragment); } G_END_DECLS -GADScanContext * -gad_scan_context_new_raw(std::shared_ptr *arrow_scan_context) -{ - auto scan_context = - GAD_SCAN_CONTEXT(g_object_new(GAD_TYPE_SCAN_CONTEXT, - "scan-context", arrow_scan_context, - NULL)); - return scan_context; -} - -std::shared_ptr -gad_scan_context_get_raw(GADScanContext *scan_context) -{ - auto priv = GAD_SCAN_CONTEXT_GET_PRIVATE(scan_context); - return priv->scan_context; -} - GADScanOptions * gad_scan_options_new_raw(std::shared_ptr *arrow_scan_options) { @@ -691,14 +514,12 @@ gad_scan_options_get_raw(GADScanOptions *scan_options) GADInMemoryScanTask * gad_in_memory_scan_task_new_raw(std::shared_ptr *arrow_in_memory_scan_task, GADScanOptions *options, - GADScanContext *context, GADInMemoryFragment *fragment) { auto in_memory_scan_task = GAD_IN_MEMORY_SCAN_TASK(g_object_new(GAD_TYPE_IN_MEMORY_SCAN_TASK, "scan-task", arrow_in_memory_scan_task, "options", options, - "context", context, "fragment", fragment, NULL)); return in_memory_scan_task; diff --git a/c_glib/arrow-dataset-glib/scanner.h b/c_glib/arrow-dataset-glib/scanner.h index 08e2d9a17f8..f387e8948f2 100644 --- a/c_glib/arrow-dataset-glib/scanner.h +++ b/c_glib/arrow-dataset-glib/scanner.h @@ -25,22 +25,6 @@ G_BEGIN_DECLS -/* arrow::dataset::ScanContext */ - -#define GAD_TYPE_SCAN_CONTEXT (gad_scan_context_get_type()) -G_DECLARE_DERIVABLE_TYPE(GADScanContext, - gad_scan_context, - GAD, - SCAN_CONTEXT, - GObject) -struct _GADScanContextClass -{ - GObjectClass parent_class; -}; - -GARROW_AVAILABLE_IN_1_0 -GADScanContext *gad_scan_context_new(void); - /* arrow::dataset::ScanOptions */ #define GAD_TYPE_SCAN_OPTIONS (gad_scan_options_get_type()) @@ -75,8 +59,6 @@ struct _GADScanTaskClass GARROW_AVAILABLE_IN_1_0 GADScanOptions *gad_scan_task_get_options(GADScanTask *scan_task); -GARROW_AVAILABLE_IN_1_0 -GADScanContext *gad_scan_task_get_context(GADScanTask *scan_task); GARROW_AVAILABLE_IN_4_0 GADFragment *gad_scan_task_get_fragment(GADScanTask *scan_task); GARROW_AVAILABLE_IN_1_0 @@ -101,7 +83,6 @@ GADInMemoryScanTask * gad_in_memory_scan_task_new(GArrowRecordBatch **record_batches, gsize n_record_batches, GADScanOptions *options, - GADScanContext *context, GADInMemoryFragment *fragment); G_END_DECLS diff --git a/c_glib/arrow-dataset-glib/scanner.hpp b/c_glib/arrow-dataset-glib/scanner.hpp index 001b606dff9..f10351ee99b 100644 --- a/c_glib/arrow-dataset-glib/scanner.hpp +++ b/c_glib/arrow-dataset-glib/scanner.hpp @@ -24,11 +24,6 @@ #include #include -GADScanContext * -gad_scan_context_new_raw(std::shared_ptr *arrow_scan_context); -std::shared_ptr -gad_scan_context_get_raw(GADScanContext *scan_context); - GADScanOptions * gad_scan_options_new_raw(std::shared_ptr *arrow_scan_options); std::shared_ptr @@ -37,5 +32,4 @@ gad_scan_options_get_raw(GADScanOptions *scan_options); GADInMemoryScanTask * gad_in_memory_scan_task_new_raw(std::shared_ptr *arrow_in_memory_scan_task, GADScanOptions *scan_options, - GADScanContext *scan_context, GADInMemoryFragment *fragment); diff --git a/c_glib/test/dataset/test-in-memory-scan-task.rb b/c_glib/test/dataset/test-in-memory-scan-task.rb index 0c763e1617b..06e3d0d2424 100644 --- a/c_glib/test/dataset/test-in-memory-scan-task.rb +++ b/c_glib/test/dataset/test-in-memory-scan-task.rb @@ -40,14 +40,11 @@ def setup @scan_options = ArrowDataset::ScanOptions.new(@schema) - @scan_context = ArrowDataset::ScanContext.new - @fragment = ArrowDataset::InMemoryFragment.new(@schema, @record_batches) @scan_task = ArrowDataset::InMemoryScanTask.new(@record_batches, @scan_options, - @scan_context, @fragment) end @@ -55,10 +52,6 @@ def test_scan_options assert_equal(@scan_options, @scan_task.options) end - def test_scan_context - assert_equal(@scan_context, @scan_task.context) - end - def test_execute assert_equal(@record_batches, @scan_task.execute.to_list) diff --git a/c_glib/test/dataset/test-scan-context.rb b/c_glib/test/dataset/test-scan-context.rb deleted file mode 100644 index 25a624e33fd..00000000000 --- a/c_glib/test/dataset/test-scan-context.rb +++ /dev/null @@ -1,33 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -class TestDatasetScanContext < Test::Unit::TestCase - def setup - omit("Arrow Dataset is required") unless defined?(ArrowDataset) - @scan_context = ArrowDataset::ScanContext.new - end - - def test_use_threads - assert do - not @scan_context.use_threads? - end - @scan_context.use_threads = true - assert do - @scan_context.use_threads? - end - end -end diff --git a/c_glib/test/dataset/test-scan-options.rb b/c_glib/test/dataset/test-scan-options.rb index a8bcd12afde..0536b2a7cca 100644 --- a/c_glib/test/dataset/test-scan-options.rb +++ b/c_glib/test/dataset/test-scan-options.rb @@ -34,4 +34,14 @@ def test_batch_size assert_equal(42, @scan_options.batch_size) end + + def test_use_threads + assert do + not @scan_options.use_threads? + end + @scan_options.use_threads = true + assert do + @scan_options.use_threads? + end + end end diff --git a/ruby/red-arrow-dataset/lib/arrow-dataset/in-memory-scan-task.rb b/ruby/red-arrow-dataset/lib/arrow-dataset/in-memory-scan-task.rb index 122aaca4ae0..5e127e179c6 100644 --- a/ruby/red-arrow-dataset/lib/arrow-dataset/in-memory-scan-task.rb +++ b/ruby/red-arrow-dataset/lib/arrow-dataset/in-memory-scan-task.rb @@ -26,11 +26,10 @@ def initialize(record_batches, **options) end record_batch end - context = options.delete(:context) || ScanContext.new options[:schema] ||= record_batches.first.schema fragment = options.delete(:fragment) fragment ||= InMemoryFragment.new(options[:schema], record_batches) - initialize_raw(record_batches, options, context, fragment) + initialize_raw(record_batches, options, fragment) end end end