diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index c869ec2e1a3..61c74e3cf6c 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -37,7 +37,6 @@ #include "arrow/util/thread_pool.h" namespace arrow { - using RecordBatchGenerator = AsyncGenerator>; namespace csv { diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 2df34145cd9..e279d18eb3d 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -23,10 +23,13 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/scanner.h" #include "arrow/table.h" +#include "arrow/util/async_generator.h" #include "arrow/util/bit_util.h" +#include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" +#include "arrow/util/vector.h" namespace arrow { namespace dataset { @@ -56,43 +59,34 @@ Result> InMemoryFragment::ReadPhysicalSchemaImpl() { return physical_schema_; } -InMemoryFragment::InMemoryFragment(std::shared_ptr schema, - RecordBatchVector record_batches, +namespace { +struct VectorIterable { + Result operator()() { return MakeVectorGenerator(batches); } + RecordBatchVector batches; +}; +} // namespace + +InMemoryFragment::InMemoryFragment(std::shared_ptr physical_schema, + RecordBatchIterable get_batches, Expression partition_expression) - : Fragment(std::move(partition_expression), std::move(schema)), - record_batches_(std::move(record_batches)) { + : Fragment(std::move(partition_expression), std::move(physical_schema)), + get_batches_(std::move(get_batches)) { DCHECK_NE(physical_schema_, nullptr); } -InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, +InMemoryFragment::InMemoryFragment(std::shared_ptr physical_schema, + RecordBatchVector batches, Expression partition_expression) - : Fragment(std::move(partition_expression), /*schema=*/nullptr), - record_batches_(std::move(record_batches)) { - // Order of argument evaluation is undefined, so compute physical_schema here - physical_schema_ = record_batches_.empty() ? schema({}) : record_batches_[0]->schema(); + : Fragment(std::move(partition_expression), std::move(physical_schema)), + get_batches_(VectorIterable{std::move(batches)}) { + DCHECK_NE(physical_schema_, nullptr); } -Result InMemoryFragment::Scan(std::shared_ptr options) { - // Make an explicit copy of record_batches_ to ensure Scan can be called - // multiple times. - auto batches_it = MakeVectorIterator(record_batches_); - - auto batch_size = options->batch_size; - // RecordBatch -> ScanTask +Future InMemoryFragment::Scan(std::shared_ptr options) { auto self = shared_from_this(); - auto fn = [=](std::shared_ptr batch) -> std::shared_ptr { - RecordBatchVector batches; - - auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); - for (int i = 0; i < n_batches; i++) { - batches.push_back(batch->Slice(batch_size * i, batch_size)); - } - - return ::arrow::internal::make_unique(std::move(batches), - std::move(options), self); - }; - - return MakeMapIterator(fn, std::move(batches_it)); + ScanTaskVector scan_tasks{std::make_shared( + get_batches_, std::move(options), std::move(self))}; + return Future::MakeFinished(scan_tasks); } Dataset::Dataset(std::shared_ptr schema, Expression partition_expression) @@ -108,92 +102,84 @@ Result> Dataset::NewScan() { return NewScan(std::make_shared()); } -Result Dataset::GetFragments() { +Future Dataset::GetFragmentsAsync() const { ARROW_ASSIGN_OR_RAISE(auto predicate, literal(true).Bind(*schema_)); - return GetFragments(std::move(predicate)); + return GetFragmentsAsync(std::move(predicate)); } -Result Dataset::GetFragments(Expression predicate) { +Future Dataset::GetFragmentsAsync(Expression predicate) const { ARROW_ASSIGN_OR_RAISE( predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_)); return predicate.IsSatisfiable() ? GetFragmentsImpl(std::move(predicate)) - : MakeEmptyIterator>(); + : FragmentVector{}; } -struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { - explicit VectorRecordBatchGenerator(RecordBatchVector batches) - : batches_(std::move(batches)) {} - - RecordBatchIterator Get() const final { return MakeVectorIterator(batches_); } +namespace { - RecordBatchVector batches_; +struct TableIterable { + Result operator()() { + auto reader = std::make_shared(*table); + return [reader] { return reader->Next(); }; + } + std::shared_ptr table; }; -InMemoryDataset::InMemoryDataset(std::shared_ptr schema, - RecordBatchVector batches) - : Dataset(std::move(schema)), - get_batches_(new VectorRecordBatchGenerator(std::move(batches))) {} - -struct TableRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { - explicit TableRecordBatchGenerator(std::shared_ptr
table) - : table_(std::move(table)) {} +struct ReaderIterableState { + explicit ReaderIterableState(std::shared_ptr reader) + : reader(std::move(reader)), consumed(0) {} - RecordBatchIterator Get() const final { - auto reader = std::make_shared(*table_); - auto table = table_; - return MakeFunctionIterator([reader, table] { return reader->Next(); }); + std::shared_ptr reader; + std::atomic consumed; +}; +struct ReaderIterable { + explicit ReaderIterable(std::shared_ptr reader) + : state(std::make_shared(std::move(reader))) {} + + Result operator()() { + if (state->consumed.fetch_or(1)) { + return Status::Invalid( + "A dataset created from a RecordBatchReader can only be scanned once"); + } + auto reader_capture = state->reader; + return [reader_capture] { return reader_capture->Next(); }; } - std::shared_ptr
table_; + std::shared_ptr state; }; -InMemoryDataset::InMemoryDataset(std::shared_ptr
table) - : Dataset(table->schema()), - get_batches_(new TableRecordBatchGenerator(std::move(table))) {} - -struct ReaderRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { - explicit ReaderRecordBatchGenerator(std::shared_ptr reader) - : reader_(std::move(reader)), consumed_(false) {} +} // namespace - RecordBatchIterator Get() const final { - if (consumed_) { - return MakeErrorIterator>(Status::Invalid( - "RecordBatchReader-backed InMemoryDataset was already consumed")); - } - consumed_ = true; - auto reader = reader_; - return MakeFunctionIterator([reader] { return reader->Next(); }); - } +std::shared_ptr InMemoryDataset::FromTable( + std::shared_ptr
table) { + auto schema = table->schema(); + return std::make_shared(std::move(schema), + TableIterable{std::move(table)}); +} - std::shared_ptr reader_; - mutable bool consumed_; -}; +std::shared_ptr InMemoryDataset::FromReader( + std::shared_ptr reader) { + auto schema = reader->schema(); + return std::make_shared(std::move(schema), + ReaderIterable{std::move(reader)}); +} -InMemoryDataset::InMemoryDataset(std::shared_ptr reader) - : Dataset(reader->schema()), - get_batches_(new ReaderRecordBatchGenerator(std::move(reader))) {} +std::shared_ptr InMemoryDataset::FromBatches( + std::shared_ptr schema, RecordBatchVector batches) { + return std::make_shared(std::move(schema), + VectorIterable{std::move(batches)}); +} Result> InMemoryDataset::ReplaceSchema( std::shared_ptr schema) const { RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); - return std::make_shared(std::move(schema), get_batches_); + return std::make_shared(std::move(schema), std::move(get_batches_)); } -Result InMemoryDataset::GetFragmentsImpl(Expression) { +Future InMemoryDataset::GetFragmentsImpl(Expression) const { auto schema = this->schema(); - auto create_fragment = - [schema](std::shared_ptr batch) -> Result> { - if (!batch->schema()->Equals(schema)) { - return Status::TypeError("yielded batch had schema ", *batch->schema(), - " which did not match InMemorySource's: ", *schema); - } - - RecordBatchVector batches{batch}; - return std::make_shared(std::move(batches)); - }; - - return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get()); + FragmentVector fragments{std::make_shared(schema, get_batches_)}; + return Future::MakeFinished(std::move(fragments)); } Result> UnionDataset::Make(std::shared_ptr schema, @@ -220,7 +206,7 @@ Result> UnionDataset::ReplaceSchema( new UnionDataset(std::move(schema), std::move(children))); } -Result UnionDataset::GetFragmentsImpl(Expression predicate) { +Future UnionDataset::GetFragmentsImpl(Expression predicate) const { return GetFragmentsFromDatasets(children_, predicate); } diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 6be83059fc1..4199944d8b2 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -28,12 +28,18 @@ #include "arrow/dataset/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/future.h" +#include "arrow/util/iterator.h" #include "arrow/util/macros.h" #include "arrow/util/mutex.h" namespace arrow { namespace dataset { +using RecordBatchGenerator = AsyncGenerator>; +using RecordBatchIterable = std::function()>; + /// \brief A granular piece of a Dataset, such as an individual file. /// /// A Fragment can be read/scanned separately from other fragments. It yields a @@ -52,7 +58,7 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { /// The schema is cached after being read once, or may be specified at construction. Result> ReadPhysicalSchema(); - /// \brief Scan returns an iterator of ScanTasks, each of which yields + /// \brief Scan returns a generator of ScanTasks, each of which yields /// RecordBatches from this Fragment. /// /// Note that batches yielded using this method will not be filtered and may not align @@ -62,10 +68,7 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { /// columns may be absent if they were not present in this fragment. /// /// To receive a record batch stream which is fully filtered and projected, use Scanner. - virtual Result Scan(std::shared_ptr options) = 0; - - /// \brief Return true if the fragment can benefit from parallel scanning. - virtual bool splittable() const = 0; + virtual Future Scan(std::shared_ptr options) = 0; virtual std::string type_name() const = 0; virtual std::string ToString() const { return type_name(); } @@ -105,20 +108,21 @@ class ARROW_DS_EXPORT FragmentScanOptions { /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: - InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, + InMemoryFragment(std::shared_ptr schema, RecordBatchIterable get_batches, Expression = literal(true)); - explicit InMemoryFragment(RecordBatchVector record_batches, Expression = literal(true)); - - Result Scan(std::shared_ptr options) override; + InMemoryFragment(std::shared_ptr schema, RecordBatchVector batches, + Expression = literal(true)); + // explicit InMemoryFragment(RecordBatchIterable get_batches, Expression = + // literal(true)); - bool splittable() const override { return false; } + Future Scan(std::shared_ptr options) override; std::string type_name() const override { return "in-memory"; } protected: Result> ReadPhysicalSchemaImpl() override; - RecordBatchVector record_batches_; + RecordBatchIterable get_batches_; }; /// \brief A container of zero or more Fragments. @@ -133,8 +137,19 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Result> NewScan(); /// \brief GetFragments returns an iterator of Fragments given a predicate. - Result GetFragments(Expression predicate); - Result GetFragments(); + Future GetFragmentsAsync(Expression predicate) const; + Result GetFragments(Expression predicate) const { + auto fut = GetFragmentsAsync(predicate); + fut.Wait(); + ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fut.result()); + return MakeVectorIterator(fragments_vec); + } + Future GetFragmentsAsync() const; + Result GetFragments() const { + auto fut = GetFragmentsAsync(); + ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fut.result()); + return MakeVectorIterator(fragments_vec); + } const std::shared_ptr& schema() const { return schema_; } @@ -159,7 +174,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Dataset(std::shared_ptr schema, Expression partition_expression); - virtual Result GetFragmentsImpl(Expression predicate) = 0; + virtual Future GetFragmentsImpl(Expression predicate) const = 0; std::shared_ptr schema_; Expression partition_expression_ = literal(true); @@ -170,31 +185,24 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { /// The record batches must match the schema provided to the source at construction. class ARROW_DS_EXPORT InMemoryDataset : public Dataset { public: - class RecordBatchGenerator { - public: - virtual ~RecordBatchGenerator() = default; - virtual RecordBatchIterator Get() const = 0; - }; - - InMemoryDataset(std::shared_ptr schema, - std::shared_ptr get_batches) + InMemoryDataset(std::shared_ptr schema, RecordBatchIterable get_batches) : Dataset(std::move(schema)), get_batches_(std::move(get_batches)) {} - // Convenience constructor taking a fixed list of batches - InMemoryDataset(std::shared_ptr schema, RecordBatchVector batches); - - explicit InMemoryDataset(std::shared_ptr
table); - explicit InMemoryDataset(std::shared_ptr reader); - std::string type_name() const override { return "in-memory"; } Result> ReplaceSchema( std::shared_ptr schema) const override; + static std::shared_ptr FromTable(std::shared_ptr
table); + static std::shared_ptr FromReader( + std::shared_ptr reader); + static std::shared_ptr FromBatches(std::shared_ptr schema, + RecordBatchVector batches); + protected: - Result GetFragmentsImpl(Expression predicate) override; + Future GetFragmentsImpl(Expression predicate) const override; - std::shared_ptr get_batches_; + RecordBatchIterable get_batches_; }; /// \brief A Dataset wrapping child Datasets. @@ -216,7 +224,7 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset { std::shared_ptr schema) const override; protected: - Result GetFragmentsImpl(Expression predicate) override; + Future GetFragmentsImpl(Expression predicate) const override; explicit UnionDataset(std::shared_ptr schema, DatasetVector children) : Dataset(std::move(schema)), children_(std::move(children)) {} diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index b28bf7a14a4..3569f7e2cf7 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -29,34 +29,31 @@ #include "arrow/record_batch.h" #include "arrow/scalar.h" #include "arrow/type.h" +#include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/optional.h" +#include "arrow/util/vector.h" namespace arrow { namespace dataset { /// \brief GetFragmentsFromDatasets transforms a vector into a -/// flattened FragmentIterator. -inline Result GetFragmentsFromDatasets(const DatasetVector& datasets, - Expression predicate) { - // Iterator - auto datasets_it = MakeVectorIterator(datasets); - - // Dataset -> Iterator - auto fn = [predicate](std::shared_ptr dataset) -> Result { - return dataset->GetFragments(predicate); +/// flattened vector. +inline Future GetFragmentsFromDatasets(const DatasetVector& datasets, + Expression predicate) { + // Dataset -> Future + auto fn = [predicate](std::shared_ptr dataset) -> Future { + return dataset->GetFragmentsAsync(predicate); }; - // Iterator> - auto fragments_it = MakeMaybeMapIterator(fn, std::move(datasets_it)); + auto fragment_futures = internal::MapVector(fn, datasets); - // Iterator - return MakeFlattenIterator(std::move(fragments_it)); -} - -inline RecordBatchIterator IteratorFromReader( - const std::shared_ptr& reader) { - return MakeFunctionIterator([reader] { return reader->Next(); }); + return All(fragment_futures) + .Then([](const std::vector>& fragment_vecs) + -> Result { + ARROW_ASSIGN_OR_RAISE(auto unwrapped_vecs, internal::UnwrapOrRaise(fragment_vecs)) + return internal::FlattenVectors(std::move(unwrapped_vecs)); + }); } inline std::shared_ptr SchemaFromColumnNames( diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index 1db96b8b5c3..ab01cd8e400 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -43,7 +43,7 @@ TEST_F(TestInMemoryFragment, Scan) { // Creates a InMemoryFragment of the same repeated batch. RecordBatchVector batches = {static_cast(kNumberBatches), batch}; - auto fragment = std::make_shared(batches); + auto fragment = std::make_shared(schema_, batches); AssertFragmentEquals(reader.get(), fragment.get()); } @@ -58,7 +58,7 @@ TEST_F(TestInMemoryDataset, ReplaceSchema) { auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); - auto dataset = std::make_shared( + auto dataset = InMemoryDataset::FromBatches( schema_, RecordBatchVector{static_cast(kNumberBatches), batch}); // drop field @@ -88,12 +88,16 @@ TEST_F(TestInMemoryDataset, FromReader) { auto source_reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); auto target_reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); - auto dataset = std::make_shared(source_reader); + auto dataset = InMemoryDataset::FromReader(source_reader); AssertDatasetEquals(target_reader.get(), dataset.get()); // Such datasets can only be scanned once ASSERT_OK_AND_ASSIGN(auto fragments, dataset->GetFragments()); - ASSERT_RAISES(Invalid, fragments.Next()); + ASSERT_OK_AND_ASSIGN(auto fragment, fragments.Next()); + ASSERT_FINISHES_OK_AND_ASSIGN(auto scan_tasks, + fragment->Scan(std::make_shared())); + ASSERT_EQ(1, scan_tasks.size()); + ASSERT_RAISES(Invalid, scan_tasks[0]->ExecuteAsync()); } TEST_F(TestInMemoryDataset, GetFragments) { @@ -104,7 +108,7 @@ TEST_F(TestInMemoryDataset, GetFragments) { auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); - auto dataset = std::make_shared( + auto dataset = InMemoryDataset::FromBatches( schema_, RecordBatchVector{static_cast(kNumberBatches), batch}); AssertDatasetEquals(reader.get(), dataset.get()); @@ -119,7 +123,7 @@ TEST_F(TestInMemoryDataset, InMemoryFragment) { // Regression test: previously this constructor relied on undefined behavior (order of // evaluation of arguments) leading to fragments being constructed with empty schemas - auto fragment = std::make_shared(batches); + auto fragment = std::make_shared(batches[0]->schema(), batches); ASSERT_OK_AND_ASSIGN(auto schema, fragment->ReadPhysicalSchema()); AssertSchemaEqual(batch->schema(), schema); } @@ -137,8 +141,8 @@ TEST_F(TestUnionDataset, ReplaceSchema) { batch}; DatasetVector children = { - std::make_shared(schema_, batches), - std::make_shared(schema_, batches), + InMemoryDataset::FromBatches(schema_, batches), + InMemoryDataset::FromBatches(schema_, batches), }; const int64_t total_batches = children.size() * kNumberBatches; @@ -179,7 +183,7 @@ TEST_F(TestUnionDataset, GetFragments) { // Creates a complete binary tree of depth kCompleteBinaryTreeDepth where the // leaves are InMemoryDataset containing kChildPerNode fragments. - auto l1_leaf_dataset = std::make_shared( + auto l1_leaf_dataset = InMemoryDataset::FromBatches( schema_, RecordBatchVector{static_cast(kChildPerNode), batch}); ASSERT_OK_AND_ASSIGN( @@ -211,8 +215,8 @@ TEST_F(TestUnionDataset, TrivialScan) { batch}; DatasetVector children = { - std::make_shared(schema_, batches), - std::make_shared(schema_, batches), + InMemoryDataset::FromBatches(schema_, batches), + InMemoryDataset::FromBatches(schema_, batches), }; const int64_t total_batches = children.size() * kNumberBatches; diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index a51b3c09971..c011066d42c 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -26,6 +26,7 @@ #include "arrow/dataset/partition.h" #include "arrow/dataset/test_util.h" #include "arrow/filesystem/test_util.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/type_fwd.h" #include "arrow/util/checked_cast.h" @@ -71,8 +72,8 @@ class MockDatasetFactory : public DatasetFactory { } Result> Finish(FinishOptions options) override { - return std::make_shared(options.schema, - std::vector>{}); + return InMemoryDataset::FromBatches(options.schema, + std::vector>{}); } protected: @@ -140,8 +141,8 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest { options_->dataset_schema = schema; ASSERT_OK(SetProjection(options_.get(), schema->field_names())); ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema)); - ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset_->GetFragments()); - AssertFragmentsAreFromPath(std::move(fragment_it), paths); + ASSERT_OK_AND_ASSIGN(auto fragments_it, dataset_->GetFragments()); + AssertFragmentsAreFromPath(std::move(fragments_it), paths); } protected: @@ -373,9 +374,8 @@ TEST_F(FileSystemDatasetFactoryTest, FilenameNotPartOfPartitions) { auto expected = equal(field_ref("first"), literal("one")); ASSERT_OK_AND_ASSIGN(auto dataset, factory_->Finish()); - ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset->GetFragments()); - for (const auto& maybe_fragment : fragment_it) { - ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment); + ASSERT_FINISHES_OK_AND_ASSIGN(auto fragments, dataset->GetFragmentsAsync()); + for (const auto& fragment : fragments) { EXPECT_EQ(fragment->partition_expression(), expected); } } diff --git a/cpp/src/arrow/dataset/expression_test.cc b/cpp/src/arrow/dataset/expression_test.cc index 2ab796b052f..33f7bc8c8ac 100644 --- a/cpp/src/arrow/dataset/expression_test.cc +++ b/cpp/src/arrow/dataset/expression_test.cc @@ -29,6 +29,7 @@ #include "arrow/compute/registry.h" #include "arrow/dataset/expression_internal.h" #include "arrow/dataset/test_util.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" using testing::HasSubstr; diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 8437c75ae1c..e5366565812 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -34,6 +34,7 @@ #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" #include "arrow/util/compression.h" +#include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/make_unique.h" @@ -106,7 +107,7 @@ Result> FileFragment::ReadPhysicalSchemaImpl() { return format_->Inspect(source_); } -Result FileFragment::Scan(std::shared_ptr options) { +Future FileFragment::Scan(std::shared_ptr options) { auto self = std::dynamic_pointer_cast(shared_from_this()); return format_->ScanFile(std::move(options), self); } @@ -207,10 +208,11 @@ void FileSystemDataset::SetupSubtreePruning() { }); } -Result FileSystemDataset::GetFragmentsImpl(Expression predicate) { +Future FileSystemDataset::GetFragmentsImpl(Expression predicate) const { if (predicate == literal(true)) { // trivial predicate; skip subtree pruning - return MakeVectorIterator(FragmentVector(fragments_.begin(), fragments_.end())); + return Future::MakeFinished( + FragmentVector(fragments_.begin(), fragments_.end())); } std::vector fragment_indices; @@ -244,7 +246,7 @@ Result FileSystemDataset::GetFragmentsImpl(Expression predicat std::transform(fragment_indices.begin(), fragment_indices.end(), fragments.begin(), [this](int i) { return fragments_[i]; }); - return MakeVectorIterator(std::move(fragments)); + return Future::MakeFinished(std::move(fragments)); } Status FileWriter::Write(RecordBatchReader* batches) { @@ -369,7 +371,7 @@ struct WriteState { std::unordered_map> queues; }; -Status WriteNextBatch(WriteState& state, const std::shared_ptr& scan_task, +Status WriteNextBatch(WriteState& state, const Expression& fragment_partition_expression, std::shared_ptr batch) { ARROW_ASSIGN_OR_RAISE(auto groups, state.write_options.partitioning->Partition(batch)); batch.reset(); // drop to hopefully conserve memory @@ -382,8 +384,8 @@ Status WriteNextBatch(WriteState& state, const std::shared_ptr& scan_t std::unordered_set need_flushed; for (size_t i = 0; i < groups.batches.size(); ++i) { - auto partition_expression = and_(std::move(groups.expressions[i]), - scan_task->fragment()->partition_expression()); + auto partition_expression = + and_(std::move(groups.expressions[i]), fragment_partition_expression); auto batch = std::move(groups.batches[i]); ARROW_ASSIGN_OR_RAISE(auto part, @@ -439,51 +441,22 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio // an in-memory table to disk. ARROW_ASSIGN_OR_RAISE(auto fragment_it, scanner->GetFragments()); ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector()); - ScanTaskVector scan_tasks; - std::vector> scan_futs; - - for (const auto& fragment : fragments) { - auto options = std::make_shared(*scanner->options()); - // Avoid contention with multithreaded readers - options->use_threads = false; - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, - Scanner(fragment, std::move(options)).Scan()); - for (auto maybe_scan_task : scan_task_it) { - ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); - scan_tasks.push_back(std::move(scan_task)); - } - } // Store a mapping from partitions (represened by their formatted partition expressions) // to a WriteQueue which flushes batches into that partition's output file. In principle // any thread could produce a batch for any partition, so each task alternates between // pushing batches and flushing them to disk. - WriteState state(write_options); - - for (const auto& scan_task : scan_tasks) { - if (scan_task->supports_async()) { - ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync()); - std::function batch)> batch_visitor = - [&, scan_task](std::shared_ptr batch) { - return WriteNextBatch(state, scan_task, std::move(batch)); - }; - scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor)); - } else { - task_group->Append([&, scan_task] { - ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute()); + ARROW_ASSIGN_OR_RAISE(auto batches_it, scanner->ScanBatches()); - for (auto maybe_batch : batches) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch))); - } + WriteState state(write_options); - return Status::OK(); - }); - } + for (auto maybe_batch : batches_it) { + ARROW_ASSIGN_OR_RAISE(auto positioned_batch, maybe_batch); + auto fragment_partition_expression = + positioned_batch.fragment->partition_expression(); + RETURN_NOT_OK(WriteNextBatch(state, fragment_partition_expression, + positioned_batch.record_batch)); } - RETURN_NOT_OK(task_group->Finish()); - auto scan_futs_all_done = AllComplete(scan_futs); - RETURN_NOT_OK(scan_futs_all_done.status()); task_group = scanner->options()->TaskGroup(); for (const auto& part_queue : state.queues) { diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index e4e7167aa75..3eee84d22e2 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -134,9 +134,6 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this ScanFile( + virtual Future ScanFile( std::shared_ptr options, const std::shared_ptr& file) const = 0; @@ -172,11 +169,10 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this Scan(std::shared_ptr options) override; + Future Scan(std::shared_ptr options) override; std::string type_name() const override { return format_->type_name(); } std::string ToString() const override { return source_.path(); }; - bool splittable() const override { return format_->splittable(); } const FileSource& source() const { return source_; } const std::shared_ptr& format() const { return format_; } @@ -251,7 +247,7 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { FileSystemDataset(std::shared_ptr schema, Expression partition_expression) : Dataset(std::move(schema), partition_expression) {} - Result GetFragmentsImpl(Expression predicate) override; + Future GetFragmentsImpl(Expression predicate) const override; void SetupSubtreePruning(); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index b55c23dfdef..c0a57de6d3d 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -123,28 +123,30 @@ static inline Future> OpenReaderAsync( input, io::BufferedInputStream::Create(reader_options.block_size, default_memory_pool(), std::move(input))); - auto peek_fut = DeferNotOk(input->io_context().executor()->Submit( - [input, reader_options] { return input->Peek(reader_options.block_size); })); - - return peek_fut.Then([=](const util::string_view& first_block) - -> Future> { - const auto& parse_options = format.parse_options; - auto convert_options = csv::ConvertOptions::Defaults(); - if (scan_options != nullptr) { - ARROW_ASSIGN_OR_RAISE(convert_options, - GetConvertOptions(format, scan_options, first_block, pool)); - } - - return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input), - reader_options, parse_options, convert_options) - .Then( + // Grab the first block and use it to determine the schema and create a reader. The + // input->Peek call blocks so we run the whole thing on the I/O thread pool. + return DeferNotOk(input->io_context().executor()->Submit( + [=]() -> Future> { + ARROW_ASSIGN_OR_RAISE(auto first_block, input->Peek(reader_options.block_size)); + const auto& parse_options = format.parse_options; + auto convert_options = csv::ConvertOptions::Defaults(); + if (scan_options != nullptr) { + ARROW_ASSIGN_OR_RAISE(convert_options, GetConvertOptions(format, scan_options, + first_block, pool)); + } + + auto reader_fut = csv::StreamingReader::MakeAsync( + io::default_io_context(), std::move(input), reader_options, parse_options, + convert_options); + // Adds the filename to the error + return reader_fut.Then( [](const std::shared_ptr& maybe_reader) -> Result> { return maybe_reader; }, [source](const Status& err) -> Result> { return err.WithMessage("Could not open CSV input source '", source.path(), "': ", err); }); - }); + })); } static inline Result> OpenReader( @@ -165,15 +167,11 @@ class CsvScanTask : public ScanTask { format_(std::move(format)), source_(fragment->source()) {} - Result Execute() override { - ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); - return MakeGeneratorIterator(std::move(gen)); - } - - bool supports_async() const override { return true; } - Result ExecuteAsync() override { auto reader_fut = OpenReaderAsync(source_, *format_, options(), options()->pool); + // OpenReaderAsync runs on the I/O thread pool so we want to get back on the CPU + // thread pool + auto transferred_fut = internal::GetCpuThreadPool()->Transfer(reader_fut); auto generator_fut = reader_fut.Then( [](const std::shared_ptr& reader) -> RecordBatchGenerator { return [reader]() { return reader->ReadNextAsync(); }; @@ -212,14 +210,14 @@ Result> CsvFileFormat::Inspect(const FileSource& source) return reader->schema(); } -Result CsvFileFormat::ScanFile( +Future CsvFileFormat::ScanFile( std::shared_ptr options, const std::shared_ptr& fragment) const { auto this_ = checked_pointer_cast(shared_from_this()); auto task = std::make_shared(std::move(this_), std::move(options), std::move(fragment)); - return MakeVectorIterator>({std::move(task)}); + return Future::MakeFinished(ScanTaskVector{std::move(task)}); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index b235195c5e3..2cad326713f 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -49,7 +49,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { Result> Inspect(const FileSource& source) const override; /// \brief Open a file for scanning - Result ScanFile( + Future ScanFile( std::shared_ptr options, const std::shared_ptr& fragment) const override; diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index 99ca7cc0f42..26eacec2495 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -30,6 +30,7 @@ #include "arrow/io/memory.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" @@ -73,15 +74,19 @@ class TestCsvFileFormat : public testing::TestWithParam { return internal::make_unique(info, fs, GetCompression()); } - RecordBatchIterator Batches(ScanTaskIterator scan_task_it) { - return MakeFlattenIterator(MakeMaybeMapIterator( - [](std::shared_ptr scan_task) { return scan_task->Execute(); }, - std::move(scan_task_it))); + RecordBatchVector Batches(ScanTaskVector scan_tasks) { + RecordBatchVector rbs; + for (auto&& scan_task : scan_tasks) { + EXPECT_OK_AND_ASSIGN(auto task_rbs_it, scan_task->Execute()); + EXPECT_OK_AND_ASSIGN(auto task_rbs, task_rbs_it.ToVector()); + rbs.insert(rbs.end(), task_rbs.begin(), task_rbs.end()); + } + return rbs; } - RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); - return Batches(std::move(scan_task_it)); + RecordBatchVector Batches(Fragment* fragment) { + EXPECT_FINISHES_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_)); + return Batches(std::move(scan_tasks)); } void SetSchema(std::vector> fields) { @@ -104,8 +109,7 @@ N/A int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { row_count += batch->num_rows(); } @@ -126,8 +130,7 @@ bar)"); opts_->fragment_scan_options = fragment_scan_options; int64_t null_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (const auto& batch : Batches(fragment.get())) { null_count += batch->GetColumnByName("str")->null_count(); } @@ -151,8 +154,7 @@ bar)"); { int64_t rows = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (const auto& batch : Batches(fragment.get())) { rows += batch->GetColumnByName("str")->length(); } ASSERT_EQ(rows, 4); @@ -163,8 +165,7 @@ bar)"); fragment_scan_options->read_options.block_size = 1 << 22; opts_->fragment_scan_options = fragment_scan_options; int64_t rows = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (const auto& batch : Batches(fragment.get())) { rows += batch->GetColumnByName("header_skipped")->length(); } ASSERT_EQ(rows, 5); @@ -186,8 +187,7 @@ N/A int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { AssertSchemaEqual(*batch->schema(), *physical_schema); row_count += batch->num_rows(); } @@ -270,16 +270,12 @@ N/A,bar ASSERT_OK(builder.Project({"str"})); ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); - ASSERT_OK_AND_ASSIGN(auto scan_task_it, scanner->Scan()); - for (auto maybe_scan_task : scan_task_it) { - ASSERT_OK_AND_ASSIGN(auto scan_task, maybe_scan_task); - ASSERT_OK_AND_ASSIGN(auto batch_it, scan_task->Execute()); - for (auto maybe_batch : batch_it) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); - // Run through the scan checking for errors to ensure that "f64" is read with the - // specified type and does not revert to the inferred type (if it reverts to - // inferring float64 then evaluation of the comparison expression should break) - } + ASSERT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches()); + for (auto maybe_batch : batch_it) { + ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + // Run through the scan checking for errors to ensure that "f64" is read with the + // specified type and does not revert to the inferred type (if it reverts to + // inferring float64 then evaluation of the comparison expression should break) } } diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index a81e8b74e86..4e685752e9d 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -25,9 +25,11 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/scanner.h" +#include "arrow/dataset/type_fwd.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" #include "arrow/util/iterator.h" namespace arrow { @@ -79,7 +81,7 @@ class IpcScanTask : public ScanTask { std::shared_ptr options) : ScanTask(std::move(options), fragment), source_(fragment->source()) {} - Result Execute() override { + Result ExecuteAsync() override { struct Impl { static Result Make( const FileSource& source, std::vector materialized_fields, @@ -97,9 +99,11 @@ class IpcScanTask : public ScanTask { Result> Next() { if (i_ == reader_->num_record_batches()) { - return nullptr; + return IterationEnd>(); } + // TODO(ARROW-11772) Once RBFR is async then switch over to that instead of this + // synchronous wrapper return reader_->ReadRecordBatch(i_++); } @@ -107,40 +111,18 @@ class IpcScanTask : public ScanTask { int i_; }; - return Impl::Make(source_, options_->MaterializedFields(), options_->pool); + ARROW_ASSIGN_OR_RAISE( + auto rb_it, Impl::Make(source_, options_->MaterializedFields(), options_->pool)); + ARROW_ASSIGN_OR_RAISE( + auto bg_gen, + MakeBackgroundGenerator(std::move(rb_it), options_->io_context.executor())); + return MakeTransferredGenerator(std::move(bg_gen), options_->cpu_executor); } private: FileSource source_; }; -class IpcScanTaskIterator { - public: - static Result Make(std::shared_ptr options, - std::shared_ptr fragment) { - return ScanTaskIterator(IpcScanTaskIterator(std::move(options), std::move(fragment))); - } - - Result> Next() { - if (once_) { - // Iteration is done. - return nullptr; - } - - once_ = true; - return std::shared_ptr(new IpcScanTask(fragment_, options_)); - } - - private: - IpcScanTaskIterator(std::shared_ptr options, - std::shared_ptr fragment) - : options_(std::move(options)), fragment_(std::move(fragment)) {} - - bool once_ = false; - std::shared_ptr options_; - std::shared_ptr fragment_; -}; - Result IpcFileFormat::IsSupported(const FileSource& source) const { RETURN_NOT_OK(source.Open().status()); return OpenReader(source).ok(); @@ -151,10 +133,11 @@ Result> IpcFileFormat::Inspect(const FileSource& source) return reader->schema(); } -Result IpcFileFormat::ScanFile( +Future IpcFileFormat::ScanFile( std::shared_ptr options, const std::shared_ptr& fragment) const { - return IpcScanTaskIterator::Make(std::move(options), std::move(fragment)); + return Future::MakeFinished( + ScanTaskVector{std::make_shared(fragment, std::move(options))}); } // diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index cbfb6b858cd..570d26cf90d 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -40,15 +40,13 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { return type_name() == other.type_name(); } - bool splittable() const override { return true; } - Result IsSupported(const FileSource& source) const override; /// \brief Return the schema of the file if possible. Result> Inspect(const FileSource& source) const override; /// \brief Open a file for scanning - Result ScanFile( + Future ScanFile( std::shared_ptr options, const std::shared_ptr& fragment) const override; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 8a5fd024575..53590a95812 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -95,15 +95,19 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin { return std::make_shared(buffer); } - RecordBatchIterator Batches(ScanTaskIterator scan_task_it) { - return MakeFlattenIterator(MakeMaybeMapIterator( - [](std::shared_ptr scan_task) { return scan_task->Execute(); }, - std::move(scan_task_it))); + RecordBatchVector Batches(ScanTaskVector scan_tasks) { + RecordBatchVector rbs; + for (auto&& scan_task : scan_tasks) { + EXPECT_OK_AND_ASSIGN(auto task_rbs_it, scan_task->Execute()); + EXPECT_OK_AND_ASSIGN(auto task_rbs, task_rbs_it.ToVector()); + rbs.insert(rbs.end(), task_rbs.begin(), task_rbs.end()); + } + return rbs; } - RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); - return Batches(std::move(scan_task_it)); + RecordBatchVector Batches(Fragment* fragment) { + EXPECT_FINISHES_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_)); + return Batches(std::move(scan_tasks)); } void SetSchema(std::vector> fields) { @@ -126,8 +130,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { row_count += batch->num_rows(); } @@ -147,8 +150,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderWithVirtualColumn) { int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { AssertSchemaEqual(*batch->schema(), *physical_schema); row_count += batch->num_rows(); } @@ -277,8 +279,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) { int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { row_count += batch->num_rows(); AssertSchemaEqual(*batch->schema(), *expected_schema, /*check_metadata=*/false); @@ -318,8 +319,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) { int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { row_count += batch->num_rows(); AssertSchemaEqual(*batch->schema(), *expected_schema, /*check_metadata=*/false); diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index d255787d55f..50964429e2b 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -70,14 +70,14 @@ class ParquetScanTask : public ScanTask { io_context_(io_context), cache_options_(cache_options) {} - Result Execute() override { + Result ExecuteAsync() override { // The construction of parquet's RecordBatchReader is deferred here to // control the memory usage of consumers who materialize all ScanTasks // before dispatching them, e.g. for scheduling purposes. // // The memory and IO incurred by the RecordBatchReader is allocated only // when Execute is called. - struct { + struct GetNextBatch { Result> operator()() const { return record_batch_reader->Next(); } @@ -86,13 +86,16 @@ class ParquetScanTask : public ScanTask { // since it must outlive the wrapped RecordBatchReader std::shared_ptr file_reader; std::unique_ptr record_batch_reader; - } NextBatch; + }; + auto next_batch = std::make_shared(); RETURN_NOT_OK(EnsurePreBuffered()); - NextBatch.file_reader = reader_; + next_batch->file_reader = reader_; RETURN_NOT_OK(reader_->GetRecordBatchReader({row_group_}, column_projection_, - &NextBatch.record_batch_reader)); - return MakeFunctionIterator(std::move(NextBatch)); + &next_batch->record_batch_reader)); + // TODO(ARROW-11843) + RecordBatchGenerator batch_generator = [next_batch]() { return (*next_batch)(); }; + return batch_generator; } // Ensure that pre-buffering has been applied to the underlying Parquet reader @@ -319,7 +322,7 @@ Result> ParquetFileFormat::GetReader return std::move(arrow_reader); } -Result ParquetFileFormat::ScanFile( +Future ParquetFileFormat::ScanFile( std::shared_ptr options, const std::shared_ptr& fragment) const { auto* parquet_fragment = checked_cast(fragment.get()); @@ -367,7 +370,7 @@ Result ParquetFileFormat::ScanFile( reader_options.io_context, reader_options.cache_options, options, fragment); } - return MakeVectorIterator(std::move(tasks)); + return Future::MakeFinished(std::move(tasks)); } Result> ParquetFileFormat::MakeFragment( diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 869857e4d34..de10bb32a70 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -68,8 +68,6 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { std::string type_name() const override { return "parquet"; } - bool splittable() const override { return true; } - bool Equals(const FileFormat& other) const override; struct ReaderOptions { @@ -113,7 +111,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { Result> Inspect(const FileSource& source) const override; /// \brief Open a file for scanning - Result ScanFile( + Future ScanFile( std::shared_ptr options, const std::shared_ptr& file) const override; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index cf14a2b7caf..9a817683909 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -27,6 +27,7 @@ #include "arrow/io/memory.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" #include "arrow/type.h" @@ -150,15 +151,19 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { return std::make_shared(buffer); } - RecordBatchIterator Batches(ScanTaskIterator scan_task_it) { - return MakeFlattenIterator(MakeMaybeMapIterator( - [](std::shared_ptr scan_task) { return scan_task->Execute(); }, - std::move(scan_task_it))); + RecordBatchVector Batches(ScanTaskVector scan_tasks) { + RecordBatchVector rbs; + for (auto&& scan_task : scan_tasks) { + EXPECT_OK_AND_ASSIGN(auto task_rbs_it, scan_task->Execute()); + EXPECT_OK_AND_ASSIGN(auto task_rbs, task_rbs_it.ToVector()); + rbs.insert(rbs.end(), task_rbs.begin(), task_rbs.end()); + } + return rbs; } - RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); - return Batches(std::move(scan_task_it)); + RecordBatchVector Batches(Fragment* fragment) { + EXPECT_FINISHES_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_)); + return Batches(std::move(scan_tasks)); } void SetFilter(Expression filter) { @@ -166,7 +171,7 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { } std::shared_ptr SingleBatch(Fragment* fragment) { - auto batches = IteratorToVector(Batches(fragment)); + auto batches = Batches(fragment); EXPECT_EQ(batches.size(), 1); return batches.front(); } @@ -176,8 +181,7 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { int64_t actual_rows = 0; int64_t actual_batches = 0; - for (auto maybe_batch : Batches(fragment)) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment)) { actual_rows += batch->num_rows(); ++actual_batches; } @@ -229,8 +233,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { row_count += batch->num_rows(); } @@ -247,13 +250,12 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { format_->reader_options.dict_columns = {"utf8"}; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); + ASSERT_FINISHES_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_)); int64_t row_count = 0; Schema expected_schema({field("utf8", dictionary(int32(), utf8()))}); - for (auto maybe_task : scan_task_it) { - ASSERT_OK_AND_ASSIGN(auto task, maybe_task); + for (auto task : scan_tasks) { ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute()); for (auto maybe_batch : rb_it) { ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); @@ -274,13 +276,12 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderPreBuffer) { format_->reader_options.pre_buffer = true; ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); + ASSERT_FINISHES_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_)); int64_t task_count = 0; int64_t row_count = 0; - for (auto maybe_task : scan_task_it) { - ASSERT_OK_AND_ASSIGN(auto task, maybe_task); + for (auto task : scan_tasks) { task_count += 1; ASSERT_OK_AND_ASSIGN(auto rb_it, task->Execute()); for (auto maybe_batch : rb_it) { @@ -328,8 +329,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { row_count += batch->num_rows(); AssertSchemaEqual(*batch->schema(), *expected_schema, /*check_metadata=*/false); @@ -369,8 +369,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { int64_t row_count = 0; - for (auto maybe_batch : Batches(fragment.get())) { - ASSERT_OK_AND_ASSIGN(auto batch, maybe_batch); + for (auto batch : Batches(fragment.get())) { row_count += batch->num_rows(); AssertSchemaEqual(*batch->schema(), *expected_schema, /*check_metadata=*/false); @@ -590,10 +589,12 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { SetFilter(greater(field_ref("i64"), literal(3))); CountRowsAndBatchesInScan(row_groups_fragment({2, 3, 4, 5}), 4 + 5 + 6, 3); + auto scan_fut = row_groups_fragment({kNumRowGroups + 1})->Scan(opts_); + ASSERT_FINISHES_AND_RAISES(IndexError, scan_fut); EXPECT_RAISES_WITH_MESSAGE_THAT( IndexError, testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"), - row_groups_fragment({kNumRowGroups + 1})->Scan(opts_)); + scan_fut.result()); } TEST_F(TestParquetFileFormat, WriteRecordBatchReader) { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 2258a10d141..260ae4590a9 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -32,6 +32,7 @@ #include "arrow/util/thread_pool.h" namespace arrow { + namespace dataset { std::vector ScanOptions::MaterializedFields() const { @@ -51,25 +52,79 @@ using arrow::internal::TaskGroup; std::shared_ptr ScanOptions::TaskGroup() const { if (use_threads) { - auto* thread_pool = arrow::internal::GetCpuThreadPool(); - return TaskGroup::MakeThreaded(thread_pool); + return TaskGroup::MakeThreaded(cpu_executor); } return TaskGroup::MakeSerial(); } -Result InMemoryScanTask::Execute() { - return MakeVectorIterator(record_batches_); -} +namespace { +struct ARROW_DS_EXPORT RecordBatchVectorIterable { + Result operator()() { return MakeVectorGenerator(batches); } + RecordBatchVector batches; +}; +} // namespace + +InMemoryScanTask::InMemoryScanTask(RecordBatchVector batches, + std::shared_ptr options, + std::shared_ptr fragment) + : ScanTask(std::move(options), std::move(fragment)), + get_batches_(RecordBatchVectorIterable{std::move(batches)}) {} + +Result InMemoryScanTask::ExecuteAsync() { + auto batch_size = options_->batch_size; + // RecordBatch -> ScanTask + ARROW_ASSIGN_OR_RAISE(auto batches_gen, get_batches_()); + + auto slice_batches_fn = + [=](const std::shared_ptr& batch) -> Result { + RecordBatchVector batches; + + auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); + for (int i = 0; i < n_batches; i++) { + batches.push_back(batch->Slice(batch_size * i, batch_size)); + } -Result ScanTask::ExecuteAsync() { - return Status::NotImplemented("Async is not implemented for this scan task yet"); + return MakeVectorGenerator(batches); + }; + auto sliced_batch_gen_gen = + MakeMappedGenerator(std::move(batches_gen), slice_batches_fn); + return MakeConcatenatedGenerator(std::move(sliced_batch_gen_gen)); } -bool ScanTask::supports_async() const { return false; } +class Scanner::FilterAndProjectScanTask : public ScanTask { + public: + explicit FilterAndProjectScanTask(std::shared_ptr task, Expression partition) + : ScanTask(task->options(), task->fragment()), + task_(std::move(task)), + partition_(std::move(partition)) {} -Result Scanner::GetFragments() { + Result ExecuteAsync() override { + ARROW_ASSIGN_OR_RAISE(auto rbs, task_->ExecuteAsync()); + ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, + SimplifyWithGuarantee(options()->filter, partition_)); + + ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, + SimplifyWithGuarantee(options()->projection, partition_)); + + RecordBatchGenerator filtered_rbs = + AddFiltering(rbs, simplified_filter, options_->pool); + + return AddProjection(std::move(filtered_rbs), simplified_projection, options_->pool); + } + + private: + std::shared_ptr task_; + Expression partition_; +}; + +Result ScanTask::Execute() { + ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); + return MakeGeneratorIterator(gen); +} + +Future Scanner::GetFragmentsAsync() { if (fragment_ != nullptr) { - return MakeVectorIterator(FragmentVector{fragment_}); + return Future::MakeFinished(FragmentVector{fragment_}); } // Transform Datasets in a flat Iterator. This @@ -78,24 +133,254 @@ Result Scanner::GetFragments() { return GetFragmentsFromDatasets({dataset_}, scan_options_->filter); } +Result Scanner::GetFragments() { + auto fut = GetFragmentsAsync(); + ARROW_ASSIGN_OR_RAISE(auto fragments_vec, fut.result()); + return MakeVectorIterator(fragments_vec); +} + +PositionedRecordBatchGenerator Scanner::ScanAsync() { + auto unordered_generator = ScanUnorderedAsync(); + auto cmp = [](const PositionedRecordBatch& left, const PositionedRecordBatch& right) { + if (left.fragment_index > right.fragment_index) { + return true; + } + if (left.scan_task_index > right.scan_task_index) { + return true; + } + return left.record_batch_index > right.record_batch_index; + }; + auto is_next = [](const PositionedRecordBatch& last, + const PositionedRecordBatch& maybe_next) { + if (last.record_batch == nullptr) { + return maybe_next.fragment_index == 0 && maybe_next.scan_task_index == 0 && + maybe_next.record_batch_index == 0; + } + if (maybe_next.fragment_index > last.fragment_index + 1) { + return false; + } + if (maybe_next.fragment_index == last.fragment_index + 1) { + return maybe_next.record_batch_index == 0 && maybe_next.scan_task_index == 0 && + last.last_scan_task && last.last_record_batch; + } + if (maybe_next.scan_task_index > last.scan_task_index + 1) { + return false; + } + if (maybe_next.scan_task_index == last.scan_task_index + 1) { + return maybe_next.record_batch_index == 0 && last.last_record_batch; + } + return maybe_next.record_batch_index == last.record_batch_index + 1; + }; + return MakeSequencingGenerator(std::move(unordered_generator), cmp, is_next, + PositionedRecordBatch::BeforeAny()); +} + +Status Scanner::ValidateOptions() { + if (scan_options_->batch_readahead < 1) { + return Status::Invalid("ScanOptions::batch_readahead must be >= 1"); + } + if (scan_options_->file_readahead < 1) { + return Status::Invalid("ScanOptions::file_readahead must be >= 1"); + } + return Status::OK(); +} + +PositionedRecordBatchGenerator Scanner::ScanUnorderedAsync() { + return MakeFromFuture(GetFragmentsAsync().Then( + [this](const FragmentVector& fragments) -> Result { + RETURN_NOT_OK(ValidateOptions()); + return GetUnorderedRecordBatchGenerator(fragments, scan_options_); + })); +} + Result Scanner::Scan() { - // Transforms Iterator into a unified - // Iterator. The first Iterator::Next invocation is going to do - // all the work of unwinding the chained iterators. - ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments()); - return GetScanTaskIterator(std::move(fragment_it), scan_options_); + // This method is kept around for backwards compatibility. There is no longer any need + // for the consumer to execute the scan tasks. Instead we return each record batch + // wrapped in an InMemoryScanTask. + auto record_batch_generator = ScanAsync(); + auto scan_options = scan_options_; + auto wrap_record_batch = [scan_options](const PositionedRecordBatch& positioned_batch) + -> Result> { + auto& record_batch = positioned_batch.record_batch; + auto get_batches = [record_batch] { + return MakeVectorGenerator(RecordBatchVector{record_batch}); + }; + return std::make_shared(get_batches, scan_options, + positioned_batch.fragment); + }; + auto wrapped_generator = + MakeMappedGenerator(std::move(record_batch_generator), wrap_record_batch); + return MakeGeneratorIterator(std::move(wrapped_generator)); +} + +Result Scanner::ScanBatches() { + auto record_batch_generator = ScanAsync(); + return MakeGeneratorIterator(std::move(record_batch_generator)); +} + +Result> Scanner::FilterRecordBatch( + const Expression& filter, MemoryPool* pool, const std::shared_ptr& in) { + compute::ExecContext exec_context{pool}; + ARROW_ASSIGN_OR_RAISE(Datum mask, + ExecuteScalarExpression(filter, Datum(in), &exec_context)); + + if (mask.is_scalar()) { + const auto& mask_scalar = mask.scalar_as(); + if (mask_scalar.is_valid && mask_scalar.value) { + return std::move(in); + } + return in->Slice(0, 0); + } + + ARROW_ASSIGN_OR_RAISE( + Datum filtered, + compute::Filter(in, mask, compute::FilterOptions::Defaults(), &exec_context)); + return filtered.record_batch(); +} + +RecordBatchGenerator Scanner::AddFiltering(RecordBatchGenerator rbs, Expression filter, + MemoryPool* pool) { + auto mapper = [=](const std::shared_ptr& in) { + return FilterRecordBatch(filter, pool, in); + }; + return MakeMappedGenerator(std::move(rbs), mapper); } -Result ScanTaskIteratorFromRecordBatch( - std::vector> batches, - std::shared_ptr options) { - if (batches.empty()) { - return MakeVectorIterator(ScanTaskVector()); +Result> Scanner::ProjectRecordBatch( + const Expression& projection, MemoryPool* pool, + const std::shared_ptr& in) { + compute::ExecContext exec_context{pool}; + ARROW_ASSIGN_OR_RAISE(Datum projected, + ExecuteScalarExpression(projection, Datum(in), &exec_context)); + DCHECK_EQ(projected.type()->id(), Type::STRUCT); + if (projected.shape() == ValueDescr::SCALAR) { + // Only virtual columns are projected. Broadcast to an array + ARROW_ASSIGN_OR_RAISE(projected, + MakeArrayFromScalar(*projected.scalar(), in->num_rows(), pool)); } - auto schema = batches[0]->schema(); - auto fragment = - std::make_shared(std::move(schema), std::move(batches)); - return fragment->Scan(std::move(options)); + + ARROW_ASSIGN_OR_RAISE(auto out, + RecordBatch::FromStructArray(projected.array_as())); + + return out->ReplaceSchemaMetadata(in->schema()->metadata()); +} + +RecordBatchGenerator Scanner::AddProjection(RecordBatchGenerator rbs, + Expression projection, MemoryPool* pool) { + auto mapper = [=](const std::shared_ptr& in) { + return ProjectRecordBatch(projection, pool, in); + }; + return MakeMappedGenerator(std::move(rbs), mapper); +} + +struct PositionedScanTask { + std::shared_ptr scan_task; + int fragment_index; + int scan_task_index; + bool last_scan_task; + + PositionedScanTask ReplaceScanTask(std::shared_ptr new_task) const { + return PositionedScanTask{new_task, fragment_index, scan_task_index, last_scan_task}; + } +}; + +std::vector PositionTasks(const ScanTaskVector& scan_tasks, + int fragment_index) { + std::vector positioned_tasks; + positioned_tasks.reserve(scan_tasks.size()); + for (std::size_t i = 0; i < scan_tasks.size(); i++) { + positioned_tasks.push_back(PositionedScanTask{ + scan_tasks[i], fragment_index, static_cast(i), i == scan_tasks.size() - 1}); + } + return positioned_tasks; +} + +} // namespace dataset + +template <> +struct IterationTraits { + static dataset::PositionedScanTask End() { + return dataset::PositionedScanTask{nullptr, -1, -1, false}; + } + static bool IsEnd(const dataset::PositionedScanTask val) { + return val.scan_task == nullptr; + } +}; + +namespace dataset { + +PositionedRecordBatchGenerator Scanner::FragmentToPositionedRecordBatches( + std::shared_ptr options, const std::shared_ptr& fragment, + int fragment_index) { + return MakeFromFuture(fragment->Scan(options).Then( + [fragment, fragment_index, options](const ScanTaskVector& scan_tasks) { + auto positioned_tasks = PositionTasks(scan_tasks, fragment_index); + // Apply the filter and/or projection to incoming RecordBatches by + // wrapping the ScanTask with a FilterAndProjectScanTask + auto wrap_scan_task = + [fragment](const PositionedScanTask& task) -> Result { + auto partition = fragment->partition_expression(); + auto wrapped_task = std::make_shared( + std::move(task.scan_task), partition); + return task.ReplaceScanTask(wrapped_task); + }; + + auto scan_tasks_gen = MakeVectorGenerator(positioned_tasks); + auto wrapped_tasks_gen = + MakeMappedGenerator(std::move(scan_tasks_gen), wrap_scan_task); + + auto execute_task = [fragment, fragment_index](const PositionedScanTask& task) + -> Result { + ARROW_ASSIGN_OR_RAISE(auto record_batch_gen, task.scan_task->ExecuteAsync()); + auto enumerated_rb_gen = MakeEnumeratedGenerator(record_batch_gen); + auto scan_task_index = task.scan_task_index; + auto last_scan_task = task.last_scan_task; + auto to_positioned_batch = + [fragment, fragment_index, scan_task_index, + last_scan_task](const Enumerated>& batch) + // FIXME This shouldn't have to be a result, need to fix mapped generator + // SNIFAE weirdness + -> Result { + return PositionedRecordBatch{ + *batch.value, fragment, fragment_index, scan_task_index, + last_scan_task, batch.index, batch.last}; + }; + return MakeMappedGenerator(enumerated_rb_gen, to_positioned_batch); + }; + + auto rb_gen_gen = MakeMappedGenerator(std::move(wrapped_tasks_gen), execute_task); + // This is really "how many scan tasks to run at once" however all readers either + // do 1 scan task (in which case this is pointless) or in the parquet case scan + // task per row group which is close enough to per-batch + return MakeMergedGenerator(rb_gen_gen, options->batch_readahead); + })); +} + +AsyncGenerator +Scanner::FragmentsToPositionedRecordBatches(std::shared_ptr options, + const FragmentVector& fragments) { + auto fragment_generator = MakeVectorGenerator(fragments); + auto fragment_counter = std::make_shared>(0); + std::function& fragment)> + fragment_to_scan_tasks = + [options, fragment_counter](const std::shared_ptr& fragment) { + auto fragment_index = fragment_counter->fetch_add(1); + return FragmentToPositionedRecordBatches(options, fragment, fragment_index); + }; + return MakeMappedGenerator(fragment_generator, fragment_to_scan_tasks); +} + +/// \brief GetScanTaskGenerator transforms FragmentVector->ScanTaskGenerator +Result Scanner::GetUnorderedRecordBatchGenerator( + const FragmentVector& fragments, std::shared_ptr options) { + // Fragments -> ScanTaskGeneratorGenerator + auto scan_task_generator_generator = + FragmentsToPositionedRecordBatches(options, fragments); + // ScanTaskGeneratorGenerator -> ScanTaskGenerator + auto merged = MakeMergedGenerator(std::move(scan_task_generator_generator), + options->file_readahead); + // return MakeReadaheadGenerator(merged, options->file_readahead); + return merged; } ScannerBuilder::ScannerBuilder(std::shared_ptr dataset) @@ -172,72 +457,73 @@ Result> ScannerBuilder::Finish() { return std::make_shared(dataset_, scan_options_); } -static inline RecordBatchVector FlattenRecordBatchVector( - std::vector nested_batches) { - RecordBatchVector flattened; - - for (auto& task_batches : nested_batches) { - for (auto& batch : task_batches) { - flattened.emplace_back(std::move(batch)); - } - } - - return flattened; -} - struct TableAssemblyState { /// Protecting mutating accesses to batches std::mutex mutex{}; - std::vector batches{}; + std::vector> batches{}; + int scan_task_id = 0; - void Emplace(RecordBatchVector b, size_t position) { + void Emplace(std::shared_ptr batch, size_t fragment_index, + size_t task_index, size_t record_batch_index) { std::lock_guard lock(mutex); - if (batches.size() <= position) { - batches.resize(position + 1); + if (batches.size() <= fragment_index) { + batches.resize(fragment_index + 1); } - batches[position] = std::move(b); + if (batches[fragment_index].size() <= task_index) { + batches[fragment_index].resize(task_index + 1); + } + if (batches[fragment_index][task_index].size() <= record_batch_index) { + batches[fragment_index][task_index].resize(record_batch_index + 1); + } + batches[fragment_index][task_index][record_batch_index] = std::move(batch); + } + + std::vector> Finish() { + std::vector> all_batches; + for (auto& fragment_batches : batches) { + for (auto& task_batches : fragment_batches) { + auto end = std::make_move_iterator(task_batches.end()); + for (auto it = std::make_move_iterator(task_batches.begin()); it != end; it++) { + all_batches.push_back(*it); + } + } + } + return all_batches; } }; Result> Scanner::ToTable() { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan()); - auto task_group = scan_options_->TaskGroup(); + auto table_fut = ToTableAsync(); + ARROW_ASSIGN_OR_RAISE(auto table, table_fut.result()); + return table; +} +Future> Scanner::ToTableAsync() { + auto scan_options = scan_options_; + auto positioned_batch_gen = ScanUnorderedAsync(); /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't /// invalidate concurrently running tasks when Finish() early returns /// and the mutex/batches fail out of scope. auto state = std::make_shared(); - size_t scan_task_id = 0; - std::vector> scan_futures; - for (auto maybe_scan_task : scan_task_it) { - ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); - - auto id = scan_task_id++; - if (scan_task->supports_async()) { - ARROW_ASSIGN_OR_RAISE(auto scan_gen, scan_task->ExecuteAsync()); - auto scan_fut = CollectAsyncGenerator(std::move(scan_gen)) - .Then([state, id](const RecordBatchVector& rbs) { - state->Emplace(rbs, id); - }); - scan_futures.push_back(std::move(scan_fut)); - } else { - task_group->Append([state, id, scan_task] { - ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); - ARROW_ASSIGN_OR_RAISE(auto local, batch_it.ToVector()); - state->Emplace(std::move(local), id); - return Status::OK(); + // TODO(ARROW-12023) Ideally this mapping function would just return Status but that + // isn't allowed because Status is not iterable + // FIXME Return PositionedBatch and not Result when snifae confusion is figured out + // in async_generator + auto table_building_task = + [state](const PositionedRecordBatch& batch) -> Result { + state->Emplace(batch.record_batch, batch.fragment_index, batch.scan_task_index, + batch.record_batch_index); + return batch; + }; + + auto table_building_gen = + MakeMappedGeneratorAsync(positioned_batch_gen, table_building_task); + + return DiscardAllFromAsyncGenerator(table_building_gen) + .Then([state, scan_options](...) { + return Table::FromRecordBatches(scan_options->projected_schema, state->Finish()); }); - } - } - // Wait for all async tasks to complete, or the first error - RETURN_NOT_OK(AllComplete(scan_futures).status()); - - // Wait for all sync tasks to complete, or the first error. - RETURN_NOT_OK(task_group->Finish()); - - return Table::FromRecordBatches(scan_options_->projected_schema, - FlattenRecordBatchVector(std::move(state->batches))); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index c3cce00d8c5..b88992fd22b 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -29,6 +29,7 @@ #include "arrow/dataset/projector.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/io/interfaces.h" #include "arrow/memory_pool.h" #include "arrow/type_fwd.h" #include "arrow/util/async_generator.h" @@ -39,6 +40,8 @@ using RecordBatchGenerator = AsyncGenerator>; namespace dataset { constexpr int64_t kDefaultBatchSize = 1 << 20; +constexpr int32_t kDefaultBatchReadahead = 32; +constexpr int32_t kDefaultFileReadahead = 8; struct ARROW_DS_EXPORT ScanOptions { // Filter and projection @@ -65,9 +68,21 @@ struct ARROW_DS_EXPORT ScanOptions { // Maximum row count for scanned batches. int64_t batch_size = kDefaultBatchSize; + // How many batches to read ahead within a file + int32_t batch_readahead = kDefaultBatchReadahead; + + // How many files to read ahead + int32_t file_readahead = kDefaultFileReadahead; + /// A pool from which materialized and scanned arrays will be allocated. MemoryPool* pool = arrow::default_memory_pool(); + /// Executor on which to run any CPU tasks + internal::Executor* cpu_executor = internal::GetCpuThreadPool(); + + /// IOContext for any IO tasks + io::IOContext io_context; + /// Indicate if the Scanner should make use of a ThreadPool. bool use_threads = false; @@ -102,9 +117,8 @@ class ARROW_DS_EXPORT ScanTask { /// \brief Iterate through sequence of materialized record batches /// resulting from the Scan. Execution semantics are encapsulated in the /// particular ScanTask implementation - virtual Result Execute() = 0; - virtual Result ExecuteAsync(); - virtual bool supports_async() const; + virtual Result ExecuteAsync() = 0; + virtual Result Execute(); virtual ~ScanTask() = default; @@ -119,24 +133,63 @@ class ARROW_DS_EXPORT ScanTask { std::shared_ptr fragment_; }; +using ScanTaskGenerator = AsyncGenerator>; + /// \brief A trivial ScanTask that yields the RecordBatch of an array. class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask { public: - InMemoryScanTask(std::vector> record_batches, - std::shared_ptr options, + InMemoryScanTask(RecordBatchIterable get_batches, std::shared_ptr options, std::shared_ptr fragment) : ScanTask(std::move(options), std::move(fragment)), - record_batches_(std::move(record_batches)) {} + get_batches_(std::move(get_batches)) {} + + InMemoryScanTask(RecordBatchVector batches, std::shared_ptr options, + std::shared_ptr fragment); - Result Execute() override; + Result ExecuteAsync() override; protected: - std::vector> record_batches_; + RecordBatchIterable get_batches_; +}; + +struct PositionedRecordBatch { + std::shared_ptr record_batch; + std::shared_ptr fragment; + int fragment_index; + int scan_task_index; + bool last_scan_task; + int record_batch_index; + bool last_record_batch; + + bool operator==(const PositionedRecordBatch& other) const { + return fragment_index == other.fragment_index && + scan_task_index == other.scan_task_index && + record_batch_index == other.record_batch_index; + } + + static PositionedRecordBatch BeforeAny() { + return PositionedRecordBatch{NULL, NULL, -1, -1, false, -1, false}; + } + static PositionedRecordBatch AfterAny() { + return PositionedRecordBatch{NULL, NULL, -1, -1, true, -1, true}; + } +}; + +} // namespace dataset +template <> +struct IterationTraits { + static dataset::PositionedRecordBatch End() { + return dataset::PositionedRecordBatch::AfterAny(); + } + static bool IsEnd(const dataset::PositionedRecordBatch& val) { + return val.record_batch == NULL; + } }; -ARROW_DS_EXPORT Result ScanTaskIteratorFromRecordBatch( - std::vector> batches, - std::shared_ptr options); +namespace dataset { + +using PositionedRecordBatchGenerator = AsyncGenerator; +using PositionedRecordBatchIterator = Iterator; /// \brief Scanner is a materialized scan operation with context and options /// bound. A scanner is the class that glues ScanTask, Fragment, @@ -154,18 +207,43 @@ class ARROW_DS_EXPORT Scanner { Scanner(std::shared_ptr fragment, std::shared_ptr scan_options) : fragment_(std::move(fragment)), scan_options_(std::move(scan_options)) {} - /// \brief The Scan operator returns a stream of ScanTask. The caller is + /// \brief The Scan operator returns a stream of RecordBatch futures. The caller is /// responsible to dispatch/schedule said tasks. Tasks should be safe to run /// in a concurrent fashion and outlive the iterator. + PositionedRecordBatchGenerator ScanUnorderedAsync(); + + /// \brief The record batches returned in this version will be + /// resequenced so they arrive in order. This will introduce some latency. + /// \see ScanUnorderedAsync + PositionedRecordBatchGenerator ScanAsync(); + + /// \brief The scan operator returns an iterator of ScanTask futures. This API + /// is being kept around for legacy purposes. New development should prefer + /// ScanBatches for synchronous scanning. + /// \see ScanAsync + /// \see ScanBatches + ARROW_DEPRECATED("Deprecated in 3.0.0 Use ScanBatches") Result Scan(); + /// \brief The scan operator loads the dataset, filters it, and projects it. The + /// resulting record batches are returned as an iterator. These batches are returned + /// as soon as they are loaded and the entire table is not read into memory. + Result ScanBatches(); + /// \brief Convert a Scanner into a Table. /// /// Use this convenience utility with care. This will serially materialize the /// Scan result in memory before creating the Table. Result> ToTable(); - /// \brief GetFragments returns an iterator over all Fragments in this scan. + /// \brief Convert a Scanner into a Table. + /// + /// Use this convenience utility with care. This will serially materialize the + /// Scan result in memory before creating the Table. + Future> ToTableAsync(); + + /// \brief GetFragments returns an vector of all Fragments in this scan. + Future GetFragmentsAsync(); Result GetFragments(); const std::shared_ptr& schema() const { @@ -175,6 +253,30 @@ class ARROW_DS_EXPORT Scanner { const std::shared_ptr& options() const { return scan_options_; } protected: + static RecordBatchGenerator AddFiltering(RecordBatchGenerator rbs, Expression filter, + MemoryPool* pool); + static Result> FilterRecordBatch( + const Expression& filter, MemoryPool* pool, const std::shared_ptr& in); + + static Result> ProjectRecordBatch( + const Expression& projection, MemoryPool* pool, + const std::shared_ptr& in); + static RecordBatchGenerator AddProjection(RecordBatchGenerator rbs, + Expression projection, MemoryPool* pool); + + static AsyncGenerator + FragmentsToPositionedRecordBatches(std::shared_ptr options, + const FragmentVector& fragments); + static PositionedRecordBatchGenerator FragmentToPositionedRecordBatches( + std::shared_ptr options, const std::shared_ptr& fragment, + int fragment_index); + static Result GetUnorderedRecordBatchGenerator( + const FragmentVector& fragments, std::shared_ptr options); + + Status ValidateOptions(); + + class FilterAndProjectScanTask; + std::shared_ptr dataset_; // TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments std::shared_ptr fragment_; @@ -205,7 +307,8 @@ class ARROW_DS_EXPORT ScannerBuilder { /// Schema. Status Project(std::vector columns); - /// \brief Set expressions which will be evaluated to produce the materialized columns. + /// \brief Set expressions which will be evaluated to produce the materialized + /// columns. /// /// Columns which are not referenced may not be read from fragments. /// diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 3101be477fd..44cf2b57ecc 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -28,7 +28,11 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/partition.h" #include "arrow/dataset/scanner.h" +#include "arrow/util/algorithm.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/future.h" #include "arrow/util/logging.h" +#include "arrow/util/vector.h" namespace arrow { @@ -36,202 +40,6 @@ using internal::checked_cast; namespace dataset { -// TODO(ARROW-7001) This synchronous version is no longer needed, can use async version -// regardless of sync/async of source -inline RecordBatchIterator FilterRecordBatch(RecordBatchIterator it, Expression filter, - MemoryPool* pool) { - return MakeMaybeMapIterator( - [=](std::shared_ptr in) -> Result> { - compute::ExecContext exec_context{pool}; - ARROW_ASSIGN_OR_RAISE(Datum mask, - ExecuteScalarExpression(filter, Datum(in), &exec_context)); - - if (mask.is_scalar()) { - const auto& mask_scalar = mask.scalar_as(); - if (mask_scalar.is_valid && mask_scalar.value) { - return std::move(in); - } - return in->Slice(0, 0); - } - - ARROW_ASSIGN_OR_RAISE( - Datum filtered, - compute::Filter(in, mask, compute::FilterOptions::Defaults(), &exec_context)); - return filtered.record_batch(); - }, - std::move(it)); -} - -inline Result> DoFilterRecordBatch( - const Expression& filter, MemoryPool* pool, const std::shared_ptr& in) { - compute::ExecContext exec_context{pool}; - ARROW_ASSIGN_OR_RAISE(Datum mask, - ExecuteScalarExpression(filter, Datum(in), &exec_context)); - - if (mask.is_scalar()) { - const auto& mask_scalar = mask.scalar_as(); - if (mask_scalar.is_valid && mask_scalar.value) { - return std::move(in); - } - return in->Slice(0, 0); - } - - ARROW_ASSIGN_OR_RAISE( - Datum filtered, - compute::Filter(in, mask, compute::FilterOptions::Defaults(), &exec_context)); - return filtered.record_batch(); -} - -inline RecordBatchGenerator FilterRecordBatch(RecordBatchGenerator rbs, Expression filter, - MemoryPool* pool) { - // TODO(ARROW-7001) This changes to auto - std::function>(const std::shared_ptr&)> - mapper = [=](const std::shared_ptr& in) { - return DoFilterRecordBatch(filter, pool, in); - }; - return MakeMappedGenerator(std::move(rbs), mapper); -} - -// TODO(ARROW-7001) This synchronous version is no longer needed, all branches use async -// version -inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, - Expression projection, MemoryPool* pool) { - return MakeMaybeMapIterator( - [=](std::shared_ptr in) -> Result> { - compute::ExecContext exec_context{pool}; - ARROW_ASSIGN_OR_RAISE(Datum projected, ExecuteScalarExpression( - projection, Datum(in), &exec_context)); - - DCHECK_EQ(projected.type()->id(), Type::STRUCT); - if (projected.shape() == ValueDescr::SCALAR) { - // Only virtual columns are projected. Broadcast to an array - ARROW_ASSIGN_OR_RAISE( - projected, MakeArrayFromScalar(*projected.scalar(), in->num_rows(), pool)); - } - - ARROW_ASSIGN_OR_RAISE( - auto out, RecordBatch::FromStructArray(projected.array_as())); - - return out->ReplaceSchemaMetadata(in->schema()->metadata()); - }, - std::move(it)); -} - -inline Result> DoProjectRecordBatch( - const Expression& projection, MemoryPool* pool, - const std::shared_ptr& in) { - compute::ExecContext exec_context{pool}; - ARROW_ASSIGN_OR_RAISE(Datum projected, - ExecuteScalarExpression(projection, Datum(in), &exec_context)); - DCHECK_EQ(projected.type()->id(), Type::STRUCT); - if (projected.shape() == ValueDescr::SCALAR) { - // Only virtual columns are projected. Broadcast to an array - ARROW_ASSIGN_OR_RAISE(projected, - MakeArrayFromScalar(*projected.scalar(), in->num_rows(), pool)); - } - - ARROW_ASSIGN_OR_RAISE(auto out, - RecordBatch::FromStructArray(projected.array_as())); - - return out->ReplaceSchemaMetadata(in->schema()->metadata()); -} - -inline RecordBatchGenerator ProjectRecordBatch(RecordBatchGenerator rbs, - Expression projection, MemoryPool* pool) { - // TODO(ARROW-7001) This changes to auto - std::function>(const std::shared_ptr&)> - mapper = [=](const std::shared_ptr& in) { - return DoProjectRecordBatch(projection, pool, in); - }; - return MakeMappedGenerator(std::move(rbs), mapper); -} - -class FilterAndProjectScanTask : public ScanTask { - public: - explicit FilterAndProjectScanTask(std::shared_ptr task, Expression partition) - : ScanTask(task->options(), task->fragment()), - task_(std::move(task)), - partition_(std::move(partition)) {} - - bool supports_async() const override { return task_->supports_async(); } - - Result ExecuteSync() { - ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); - - ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, - SimplifyWithGuarantee(options()->filter, partition_)); - - ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, - SimplifyWithGuarantee(options()->projection, partition_)); - - RecordBatchIterator filter_it = - FilterRecordBatch(std::move(it), simplified_filter, options_->pool); - - return ProjectRecordBatch(std::move(filter_it), simplified_projection, - options_->pool); - } - - Result Execute() override { - if (task_->supports_async()) { - ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync()); - return MakeGeneratorIterator(std::move(gen)); - } else { - return ExecuteSync(); - } - } - - Result ExecuteAsync() override { - if (!task_->supports_async()) { - return Status::Invalid( - "ExecuteAsync should not have been called on FilterAndProjectScanTask if the " - "source task did not support async"); - } - ARROW_ASSIGN_OR_RAISE(auto gen, task_->ExecuteAsync()); - - ARROW_ASSIGN_OR_RAISE(Expression simplified_filter, - SimplifyWithGuarantee(options()->filter, partition_)); - - ARROW_ASSIGN_OR_RAISE(Expression simplified_projection, - SimplifyWithGuarantee(options()->projection, partition_)); - - RecordBatchGenerator filter_gen = - FilterRecordBatch(std::move(gen), simplified_filter, options_->pool); - - return ProjectRecordBatch(std::move(filter_gen), simplified_projection, - options_->pool); - } - - private: - std::shared_ptr task_; - Expression partition_; -}; - -/// \brief GetScanTaskIterator transforms an Iterator in a -/// flattened Iterator. -inline Result GetScanTaskIterator( - FragmentIterator fragments, std::shared_ptr options) { - // Fragment -> ScanTaskIterator - auto fn = [options](std::shared_ptr fragment) -> Result { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options)); - - auto partition = fragment->partition_expression(); - // Apply the filter and/or projection to incoming RecordBatches by - // wrapping the ScanTask with a FilterAndProjectScanTask - auto wrap_scan_task = - [partition](std::shared_ptr task) -> std::shared_ptr { - return std::make_shared(std::move(task), partition); - }; - - return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); - }; - - // Iterator> - auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragments)); - - // Iterator - return MakeFlattenIterator(std::move(maybe_scantask_it)); -} - inline Status NestedFieldRefsNotImplemented() { // TODO(ARROW-11259) Several functions (for example, IpcScanTask::Make) assume that // only top level fields will be materialized. diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index eec8ed21668..140bf302ef7 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -17,7 +17,9 @@ #include "arrow/dataset/scanner.h" +#include #include +#include #include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" @@ -32,10 +34,144 @@ using testing::IsEmpty; namespace arrow { namespace dataset { +// TODO(westonpace) This test is here to make sure we keep this constructor because it is +// used in c_glib/arrow-dataset-glib/scanner.cpp however it doesn't really make sense to +// make an InMemoryScanTask when you already have an InMemoryFragment. We should get rid +// of the call in c_glib. +TEST(TestInMemoryScanTask, FromBatches) { + auto sch = schema({field("int32", int32())}); + RecordBatchVector batches{ConstantArrayGenerator::Zeroes(10, sch), + ConstantArrayGenerator::Zeroes(20, sch)}; + auto scan_options = std::make_shared(); + auto fragment = std::make_shared(sch, batches); + auto scan_task = std::make_shared(batches, scan_options, fragment); + ASSERT_OK_AND_ASSIGN(auto batches_gen, scan_task->ExecuteAsync()); + ASSERT_FINISHES_OK_AND_ASSIGN(auto actual_batches, CollectAsyncGenerator(batches_gen)); + ASSERT_EQ(2, batches.size()); + AssertBatchesEqual(*batches[0], *actual_batches[0]); + AssertBatchesEqual(*batches[1], *actual_batches[1]); +} + constexpr int64_t kNumberChildDatasets = 2; constexpr int64_t kNumberBatches = 16; constexpr int64_t kBatchSize = 1024; +class ControlledScanTask : public ScanTask { + public: + ControlledScanTask(std::shared_ptr options, + std::shared_ptr fragment, std::shared_ptr schema) + : ScanTask(std::move(options), std::move(fragment)), schema_(std::move(schema)) {} + virtual ~ControlledScanTask() = default; + Result ExecuteAsync() override { + execute_called_ = true; + return record_batch_generator_; + }; + + std::shared_ptr MakeRecordBatch(uint32_t value) { + auto arr = ConstantArrayGenerator::Int32(1, value); + return RecordBatch::Make(schema_, 1, {arr}); + } + + void DeliverBatch(uint32_t value) { + return record_batch_generator_.producer().Push(MakeRecordBatch(value)); + } + + void Close() { record_batch_generator_.producer().Close(); } + + bool execute_called() { return execute_called_; } + + private: + std::shared_ptr schema_; + PushGenerator> record_batch_generator_; + bool execute_called_ = false; +}; + +class ControlledFragment : public Fragment { + public: + ControlledFragment(std::shared_ptr options, std::shared_ptr schema) + : tasks_fut_(Future::Make()), options_(std::move(options)) { + physical_schema_ = schema; + } + virtual ~ControlledFragment() {} + + Future Scan(std::shared_ptr options) override { + Future tasks_fut = tasks_fut_; + // Can't hold onto the scan tasks once we've delivered them or else we have a circular + // reference. If they haven't been marked finished yet then we will reset later + if (tasks_fut_.is_finished()) { + tasks_fut_ = Future::Make(); + } else { + delivered = true; + } + return tasks_fut; + } + std::vector> DeliverScanTasks(int num_tasks) { + auto fragment = shared_from_this(); + std::vector> tasks; + ScanTaskVector base_tasks; + for (int i = 0; i < num_tasks; i++) { + auto task = + std::make_shared(options_, fragment, physical_schema_); + tasks.push_back(task); + base_tasks.push_back(task); + } + tasks_fut_.MarkFinished(base_tasks); + if (delivered) { + tasks_fut_ = Future::Make(); + delivered = false; + } + return tasks; + } + + std::string type_name() const override { return "unit-test-fragment"; } + + Result> ReadPhysicalSchemaImpl() override { + return physical_schema_; + } + + private: + Future tasks_fut_; + std::shared_ptr options_; + bool delivered = false; +}; + +class ControlledDataset : public Dataset { + public: + // Dataset API + explicit ControlledDataset(std::shared_ptr schema, + std::shared_ptr scan_options) + : Dataset(schema), + scan_options_(scan_options), + fragments_fut_(Future::Make()) {} + + virtual ~ControlledDataset() {} + std::string type_name() const override { return "unit-test-gated-in-memory"; } + Result> ReplaceSchema( + std::shared_ptr schema) const override { + return Status::NotImplemented("Should not be called by unit test"); + } + + std::vector> DeliverFragments(int num_fragments) { + std::vector> fragments; + std::vector> base_fragments; + for (int i = 0; i < num_fragments; i++) { + fragments.push_back(std::make_shared(scan_options_, schema_)); + base_fragments.push_back(fragments[i]); + } + fragments_fut_.MarkFinished(base_fragments); + return fragments; + } + + protected: + Future GetFragmentsImpl(Expression predicate) const override { + return fragments_fut_; + } + + private: + std::shared_ptr scan_options_; + Future fragments_fut_; +}; + class TestScanner : public DatasetFixtureMixin { protected: Scanner MakeScanner(std::shared_ptr batch) { @@ -43,7 +179,7 @@ class TestScanner : public DatasetFixtureMixin { batch}; DatasetVector children{static_cast(kNumberChildDatasets), - std::make_shared(batch->schema(), batches)}; + InMemoryDataset::FromBatches(batch->schema(), batches)}; EXPECT_OK_AND_ASSIGN(auto dataset, UnionDataset::Make(batch->schema(), children)); @@ -59,6 +195,14 @@ class TestScanner : public DatasetFixtureMixin { // structures of the scanner, i.e. Scanner[Dataset[ScanTask[RecordBatch]]] AssertScannerEquals(expected.get(), &scanner); } + + void AssertScanBatchesEqualsRepetitionsOf( + Scanner scanner, std::shared_ptr batch, + const int64_t total_batches = kNumberChildDatasets * kNumberBatches) { + auto expected = ConstantArrayGenerator::Repeat(total_batches, batch); + + AssertScanBatchesEquals(expected.get(), &scanner); + } }; TEST_F(TestScanner, Scan) { @@ -67,6 +211,12 @@ TEST_F(TestScanner, Scan) { AssertScannerEqualsRepetitionsOf(MakeScanner(batch), batch); } +TEST_F(TestScanner, ScanBatches) { + SetSchema({field("i32", int32()), field("f64", float64())}); + auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); + AssertScanBatchesEqualsRepetitionsOf(MakeScanner(batch), batch); +} + TEST_F(TestScanner, ScanWithCappedBatchSize) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); @@ -112,6 +262,7 @@ TEST_F(TestScanner, MaterializeMissingColumn) { ConstantArrayGenerator::Zeroes(kBatchSize, schema({field("i32", int32())})); auto fragment_missing_f64 = std::make_shared( + batch_missing_f64->schema(), RecordBatchVector{static_cast(kNumberChildDatasets * kNumberBatches), batch_missing_f64}, equal(field_ref("f64"), literal(2.5))); @@ -129,6 +280,100 @@ TEST_F(TestScanner, MaterializeMissingColumn) { AssertScannerEqualsRepetitionsOf(*scanner, batch_with_f64); } +TEST_F(TestScanner, PreservesOrder) { + auto sch = schema({field("i32", int32())}); + auto scan_options = std::make_shared(); + scan_options->use_threads = true; + auto dataset = std::make_shared(sch, scan_options); + + ScannerBuilder builder{dataset, scan_options}; + ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + + auto table_fut = scanner->ToTableAsync(); + + auto fragments = dataset->DeliverFragments(2); + auto f2_tasks = fragments[1]->DeliverScanTasks(2); + SleepABit(); + f2_tasks[1]->DeliverBatch(3); + SleepABit(); + auto f1_tasks = fragments[0]->DeliverScanTasks(2); + SleepABit(); + f1_tasks[1]->DeliverBatch(1); + SleepABit(); + f2_tasks[0]->DeliverBatch(2); + SleepABit(); + f1_tasks[0]->DeliverBatch(0); + f2_tasks[1]->Close(); + f2_tasks[0]->Close(); + f1_tasks[0]->Close(); + f1_tasks[1]->Close(); + ASSERT_FINISHES_OK_AND_ASSIGN(auto table, table_fut); + auto chunks = table->column(0)->chunks(); + ASSERT_EQ(4, chunks.size()); + ASSERT_EQ(4, table->num_rows()); + EXPECT_EQ("0", chunks[0]->GetScalar(0).ValueOrDie()->ToString()); + EXPECT_EQ("1", chunks[1]->GetScalar(0).ValueOrDie()->ToString()); + EXPECT_EQ("2", chunks[2]->GetScalar(0).ValueOrDie()->ToString()); + EXPECT_EQ("3", chunks[3]->GetScalar(0).ValueOrDie()->ToString()); +} + +int CountExecuted(std::vector> scan_tasks) { + int result = 0; + for (const auto& scan_task : scan_tasks) { + if (scan_task->execute_called()) { + result++; + } + } + return result; +} + +TEST_F(TestScanner, FileReadahead) { + auto sch = schema({field("i32", int32())}); + auto scan_options = std::make_shared(); + scan_options->use_threads = true; + scan_options->file_readahead = 2; + scan_options->batch_readahead = 100; + auto dataset = std::make_shared(sch, scan_options); + + ScannerBuilder builder{dataset, scan_options}; + ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); + + auto table_fut = scanner->ToTableAsync(); + + // Pretend the first two files are very slow. It should not start reading the third + // file + auto fragments = dataset->DeliverFragments(5); + std::vector> scan_tasks; + for (int i = 2; i < 5; i++) { + auto later_scan_tasks = fragments[i]->DeliverScanTasks(1); + later_scan_tasks[0]->DeliverBatch(i); + later_scan_tasks[0]->Close(); + scan_tasks.push_back(later_scan_tasks[0]); + } + // The first file returns a scan task but no batches arrive but the second file doesn't + // even return a scan task + auto first_scan_tasks = fragments[0]->DeliverScanTasks(1); + ASSERT_FALSE(table_fut.Wait(0.1)); + ASSERT_EQ(0, CountExecuted(scan_tasks)); + + // Finish up the two slow files and then the whole table should read, in order + first_scan_tasks[0]->DeliverBatch(0); + first_scan_tasks[0]->Close(); + first_scan_tasks = fragments[1]->DeliverScanTasks(1); + first_scan_tasks[0]->DeliverBatch(1); + first_scan_tasks[0]->Close(); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto table, table_fut); + auto chunks = table->column(0)->chunks(); + ASSERT_EQ(5, chunks.size()); + ASSERT_EQ(5, table->num_rows()); + EXPECT_EQ("0", chunks[0]->GetScalar(0).ValueOrDie()->ToString()); + EXPECT_EQ("1", chunks[1]->GetScalar(0).ValueOrDie()->ToString()); + EXPECT_EQ("2", chunks[2]->GetScalar(0).ValueOrDie()->ToString()); + EXPECT_EQ("3", chunks[3]->GetScalar(0).ValueOrDie()->ToString()); + EXPECT_EQ("4", chunks[4]->GetScalar(0).ValueOrDie()->ToString()); +} + TEST_F(TestScanner, ToTable) { SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); @@ -187,6 +432,11 @@ class TestScannerBuilder : public ::testing::Test { std::shared_ptr dataset_; }; +TEST_F(TestScannerBuilder, DefaultOptions) { + ScannerBuilder builder(dataset_); + ASSERT_OK(builder.Finish()); +} + TEST_F(TestScannerBuilder, TestProject) { ScannerBuilder builder(dataset_, options_); diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 86bb14b038d..b498e317a8b 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -40,6 +40,7 @@ #include "arrow/filesystem/test_util.h" #include "arrow/record_batch.h" #include "arrow/table.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" @@ -136,16 +137,29 @@ class DatasetFixtureMixin : public ::testing::Test { } } + /// \brief Ensure that record batches found in reader are equals to the + /// record batches yielded by the data fragment. + void AssertScanBatchEquals(RecordBatchReader* expected, RecordBatch* batch, + bool ensure_drained = true) { + std::shared_ptr lhs; + ARROW_EXPECT_OK(expected->ReadNext(&lhs)); + EXPECT_NE(lhs, nullptr); + AssertBatchesEqual(*lhs, *batch); + + if (ensure_drained) { + EnsureRecordBatchReaderDrained(expected); + } + } + /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_)); + ASSERT_FINISHES_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(options_)); - ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr task) -> Status { - AssertScanTaskEquals(expected, task.get(), false); - return Status::OK(); - })); + for (auto&& scan_task : scan_tasks) { + AssertScanTaskEquals(expected, scan_task.get(), false); + } if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); @@ -157,18 +171,26 @@ class DatasetFixtureMixin : public ::testing::Test { void AssertDatasetFragmentsEqual(RecordBatchReader* expected, Dataset* dataset, bool ensure_drained = true) { ASSERT_OK_AND_ASSIGN(auto predicate, options_->filter.Bind(*dataset->schema())); - ASSERT_OK_AND_ASSIGN(auto it, dataset->GetFragments(predicate)); + ASSERT_FINISHES_OK_AND_ASSIGN(auto fragments, dataset->GetFragmentsAsync(predicate)); - ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr fragment) -> Status { + for (auto&& fragment : fragments) { AssertFragmentEquals(expected, fragment.get(), false); - return Status::OK(); - })); + } if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); } } +/// The below utility tests the deprecated Scan function +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#elif defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4996) +#endif + /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by a scanner. void AssertScannerEquals(RecordBatchReader* expected, Scanner* scanner, @@ -185,6 +207,28 @@ class DatasetFixtureMixin : public ::testing::Test { } } +#if !(defined(_WIN32) || defined(__CYGWIN__)) +#pragma GCC diagnostic pop +#elif _MSC_VER +#pragma warning(pop) +#endif + + /// \brief Ensure that record batches found in reader are equals to the + /// record batches yielded by a scanner using the ScanBatches method + void AssertScanBatchesEquals(RecordBatchReader* expected, Scanner* scanner, + bool ensure_drained = true) { + ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatches()); + + ARROW_EXPECT_OK(it.Visit([&](PositionedRecordBatch positioned_batch) -> Status { + AssertScanBatchEquals(expected, positioned_batch.record_batch.get(), false); + return Status::OK(); + })); + + if (ensure_drained) { + EnsureRecordBatchReaderDrained(expected); + } + } + /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by a dataset. void AssertDatasetEquals(RecordBatchReader* expected, Dataset* dataset, @@ -235,10 +279,10 @@ class DummyFileFormat : public FileFormat { } /// \brief Open a file for scanning (always returns an empty iterator) - Result ScanFile( + Future ScanFile( std::shared_ptr options, const std::shared_ptr& fragment) const override { - return MakeEmptyIterator>(); + return Future::MakeFinished(ScanTaskVector()); } Result> MakeWriter( @@ -274,8 +318,20 @@ class JSONRecordBatchFileFormat : public FileFormat { return resolver_(source); } + Future ScanTaskVectorFromRecordBatch( + std::vector> batches, + std::shared_ptr options) const { + if (batches.empty()) { + return Future::MakeFinished(ScanTaskVector()); + } + auto schema = batches[0]->schema(); + auto fragment = + std::make_shared(std::move(schema), std::move(batches)); + return fragment->Scan(std::move(options)); + } + /// \brief Open a file for scanning - Result ScanFile( + Future ScanFile( std::shared_ptr options, const std::shared_ptr& fragment) const override { ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open()); @@ -286,7 +342,7 @@ class JSONRecordBatchFileFormat : public FileFormat { ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(fragment->source())); std::shared_ptr batch = RecordBatchFromJSON(schema, view); - return ScanTaskIteratorFromRecordBatch({batch}, std::move(options)); + return ScanTaskVectorFromRecordBatch({batch}, std::move(options)); } Result> MakeWriter( @@ -415,12 +471,12 @@ static std::vector PartitionExpressionsOf(const FragmentVector& frag void AssertFragmentsHavePartitionExpressions(std::shared_ptr dataset, std::vector expected) { - ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset->GetFragments()); + ASSERT_FINISHES_OK_AND_ASSIGN(auto fragments, dataset->GetFragmentsAsync()); for (auto& expr : expected) { ASSERT_OK_AND_ASSIGN(expr, expr.Bind(*dataset->schema())); } // Ordering is not guaranteed. - EXPECT_THAT(PartitionExpressionsOf(IteratorToVector(std::move(fragment_it))), + EXPECT_THAT(PartitionExpressionsOf(fragments), testing::UnorderedElementsAreArray(expected)); } @@ -724,6 +780,11 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { AssertWrittenAsExpected(); } + RecordBatchIterator IteratorFromReader( + const std::shared_ptr& reader) { + return MakeFunctionIterator([reader] { return reader->Next(); }); + } + void AssertWrittenAsExpected() { std::unordered_set expected_paths, actual_paths; for (const auto& file_contents : expected_files_) { @@ -734,10 +795,8 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { } EXPECT_THAT(actual_paths, testing::UnorderedElementsAreArray(expected_paths)); - ASSERT_OK_AND_ASSIGN(auto written_fragments_it, written_->GetFragments()); - for (auto maybe_fragment : written_fragments_it) { - ASSERT_OK_AND_ASSIGN(auto fragment, maybe_fragment); - + ASSERT_FINISHES_OK_AND_ASSIGN(auto written_fragments, written_->GetFragmentsAsync()); + for (auto fragment : written_fragments) { ASSERT_OK_AND_ASSIGN(auto actual_physical_schema, fragment->ReadPhysicalSchema()); AssertSchemaEqual(*expected_physical_schema_, *actual_physical_schema, check_metadata_); @@ -801,6 +860,11 @@ class NestedParallelismMixin : public ::testing::Test { options_->use_threads = true; } + struct VectorIterable { + Result operator()() { return MakeVectorGenerator(batches); } + RecordBatchVector batches; + }; + class NestedParallelismScanTask : public ScanTask { public: explicit NestedParallelismScanTask(std::shared_ptr target) @@ -825,45 +889,44 @@ class NestedParallelismMixin : public ::testing::Test { return MakeFromFuture(generator_fut); } - bool supports_async() const override { return true; } - private: std::shared_ptr target_; }; class NestedParallelismFragment : public InMemoryFragment { public: - explicit NestedParallelismFragment(RecordBatchVector record_batches, + explicit NestedParallelismFragment(std::shared_ptr sch, + RecordBatchVector batches, + Expression expr = literal(true)) + : InMemoryFragment(std::move(sch), std::move(batches), std::move(expr)) {} + explicit NestedParallelismFragment(std::shared_ptr sch, + RecordBatchIterable get_batches, Expression expr = literal(true)) - : InMemoryFragment(std::move(record_batches), std::move(expr)) {} - - Result Scan(std::shared_ptr options) override { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, InMemoryFragment::Scan(options)); - return MakeMaybeMapIterator( - [](std::shared_ptr task) -> Result> { - return std::make_shared(std::move(task)); - }, - std::move(scan_task_it)); + : InMemoryFragment(std::move(sch), std::move(get_batches), std::move(expr)) {} + + Future Scan(std::shared_ptr options) override { + auto scan_tasks_fut = InMemoryFragment::Scan(options); + return scan_tasks_fut.Then([](const ScanTaskVector& scan_tasks) { + return internal::MapVector( + [](const std::shared_ptr& task) -> std::shared_ptr { + return std::make_shared(std::move(task)); + }, + scan_tasks); + }); } }; class NestedParallelismDataset : public InMemoryDataset { public: NestedParallelismDataset(std::shared_ptr sch, RecordBatchVector batches) - : InMemoryDataset(std::move(sch), std::move(batches)) {} + : InMemoryDataset(std::move(sch), VectorIterable{std::move(batches)}) {} protected: - Result GetFragmentsImpl(Expression) override { + Future GetFragmentsImpl(Expression) const override { auto schema = this->schema(); - auto create_fragment = - [schema]( - std::shared_ptr batch) -> Result> { - RecordBatchVector batches{batch}; - return std::make_shared(std::move(batches)); - }; - - return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get()); + return FragmentVector{ + std::make_shared(this->schema(), get_batches_)}; } }; @@ -909,7 +972,7 @@ class NestedParallelismMixin : public ::testing::Test { Result> Inspect(const FileSource& source) const override { return Status::NotImplemented("Should not be called"); } - Result ScanFile( + Future ScanFile( std::shared_ptr options, const std::shared_ptr& file) const override { return Status::NotImplemented("Should not be called"); diff --git a/cpp/src/arrow/testing/future_util.h b/cpp/src/arrow/testing/future_util.h index 44fa78c375c..e64dc3e4810 100644 --- a/cpp/src/arrow/testing/future_util.h +++ b/cpp/src/arrow/testing/future_util.h @@ -67,6 +67,24 @@ ASSERT_EQ(expected, _actual); \ } while (0) +#define EXPECT_FINISHES_IMPL(fut) \ + do { \ + EXPECT_TRUE(fut.Wait(300)); \ + if (!fut.is_finished()) { \ + ADD_FAILURE() << "Future did not finish in a timely fashion"; \ + } \ + } while (false) + +#define ON_FINISH_ASSIGN_OR_HANDLE_ERROR_IMPL(handle_error, future_name, lhs, rexpr) \ + auto future_name = (rexpr); \ + EXPECT_FINISHES_IMPL(future_name); \ + handle_error(future_name.status()); \ + EXPECT_OK_AND_ASSIGN(lhs, future_name.result()); + +#define EXPECT_FINISHES_OK_AND_ASSIGN(lhs, rexpr) \ + ON_FINISH_ASSIGN_OR_HANDLE_ERROR_IMPL( \ + ARROW_EXPECT_OK, ARROW_ASSIGN_OR_RAISE_NAME(_fut, __COUNTER__), lhs, rexpr); + namespace arrow { template diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index db98243267b..49a6521eaa1 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -110,7 +110,7 @@ Future<> VisitAsyncGenerator(AsyncGenerator generator, /// \brief Waits for an async generator to complete, discarding results. template Future<> DiscardAllFromAsyncGenerator(AsyncGenerator generator) { - std::function visitor = [](...) { return Status::OK(); }; + std::function visitor = [](const T& val) { return Status::OK(); }; return VisitAsyncGenerator(generator, visitor); } @@ -254,7 +254,6 @@ class MappingGenerator { /// \brief Creates a generator that will apply the map function to each element of /// source. The map function is not called on the end token. /// -/// Note: This function makes a copy of `map` for each item /// Note: Errors returned from the `map` function will be propagated /// /// If the source generator is async-reentrant then this generator will be also @@ -279,6 +278,32 @@ AsyncGenerator MakeMappedGenerator(AsyncGenerator source_generator, std::function(const T&)> map) { return MappingGenerator(std::move(source_generator), std::move(map)); } +template ()(std::declval())), + typename E = typename std::enable_if::type> +AsyncGenerator MakeMappedGenerator(AsyncGenerator source_generator, + MapFunc map_fn) { + std::function map_fn_wrapper = map_fn; + return MakeMappedGenerator(std::move(source_generator), std::move(map_fn_wrapper)); +} +template ()(std::declval())), + typename V = typename VR::ValueType> +AsyncGenerator MakeMappedGenerator(AsyncGenerator source_generator, + MapFunc map_fn) { + std::function(const T&)> map_fn_wrapper = map_fn; + return MakeMappedGenerator(std::move(source_generator), std::move(map_fn_wrapper)); +} +template ()(std::declval())), + typename V = typename VR::ValueType> +// FIXME, find way to make this overload, need SFINAE to distinguish between Result +// and Future (both of which have ValueType defined) +AsyncGenerator MakeMappedGeneratorAsync(AsyncGenerator source_generator, + MapFunc map_fn) { + std::function(const T&)> map_fn_wrapper = map_fn; + return MakeMappedGenerator(std::move(source_generator), std::move(map_fn_wrapper)); +} /// \see MakeSequencingGenerator template @@ -640,6 +665,7 @@ class SerialReadaheadGenerator { std::shared_ptr state_; }; +/// \see MakeFromFuture template class FutureFirstGenerator { public: @@ -669,6 +695,12 @@ class FutureFirstGenerator { std::shared_ptr state_; }; +/// \brief Transforms a Future> into an AsyncGenerator +/// that waits for the future to complete as part of the first item. +/// +/// This generator is not async-reentrant (even if the generator yielded by future is) +/// +/// This generator does not queue template AsyncGenerator MakeFromFuture(Future> future) { return FutureFirstGenerator(std::move(future)); @@ -1032,6 +1064,25 @@ class MergedGenerator { std::shared_ptr state_; }; +/// \brief Creates a wrapper around a vector of futures to expose them as an +/// AsyncGenerator +/// +/// This generator is async reentrant +/// +/// This generator takes in a vector of results and queues that vector +template +AsyncGenerator MakeGeneratorFromFutures(const std::vector> futures) { + auto index = std::make_shared(0); + auto futures_ptr = std::make_shared>>(futures); + return [index, futures_ptr]() { + if (*index >= futures_ptr->size()) { + return AsyncGeneratorEnd(); + } + auto next_index = (*index)++; + return (*futures_ptr)[next_index]; + }; +} + /// \brief Creates a generator that takes in a stream of generators and pulls from up to /// max_subscriptions at a time /// @@ -1063,6 +1114,65 @@ AsyncGenerator MakeConcatenatedGenerator(AsyncGenerator> so return MergedGenerator(std::move(source), 1); } +template +struct Enumerated { + util::optional value; + int index; + bool last; +}; + +template +struct IterationTraits> { + static Enumerated End() { return Enumerated{{}, -1, false}; } + static bool IsEnd(const Enumerated& val) { return !val.value.has_value(); } +}; + +template +class EnumeratingGenerator { + public: + EnumeratingGenerator(AsyncGenerator source, T initial_value) + : state_(std::make_shared(std::move(source), std::move(initial_value))) {} + + Future> operator()() { + if (state_->finished) { + return AsyncGeneratorEnd>(); + } else { + auto state = state_; + return state->source().Then([state](const T& next) { + auto finished = IsIterationEnd(next); + auto prev = Enumerated{state->prev_value, state->prev_index, finished}; + state->prev_value = next; + state->prev_index++; + state->finished = finished; + return prev; + }); + } + } + + private: + struct State { + State(AsyncGenerator source, T initial_value) + : source(std::move(source)), prev_value(std::move(initial_value)), prev_index(0) { + finished = IsIterationEnd(prev_value); + } + + AsyncGenerator source; + T prev_value; + int prev_index; + bool finished; + }; + + std::shared_ptr state_; +}; + +template +AsyncGenerator> MakeEnumeratedGenerator(AsyncGenerator source) { + return FutureFirstGenerator>( + source().Then([source](const T& initial_value) -> AsyncGenerator> { + return EnumeratingGenerator(std::move(source), initial_value); + })); +} + /// \see MakeTransferredGenerator template class TransferringGenerator { diff --git a/cpp/src/arrow/util/async_generator_test.cc b/cpp/src/arrow/util/async_generator_test.cc index 518422de586..1f1053aeac1 100644 --- a/cpp/src/arrow/util/async_generator_test.cc +++ b/cpp/src/arrow/util/async_generator_test.cc @@ -219,6 +219,8 @@ ReentrantCheckerGuard ExpectNotAccessedReentrantly(AsyncGenerator* generat class GeneratorTestFixture : public ::testing::TestWithParam { protected: + AsyncGenerator MakeEmptySource() { return MakeSource({}); } + AsyncGenerator MakeSource(const std::vector& items) { std::vector wrapped(items.begin(), items.end()); auto gen = AsyncVectorIt(std::move(wrapped)); @@ -250,7 +252,6 @@ class GeneratorTestFixture : public ::testing::TestWithParam { } } }; - template class ManualIteratorControl { public: @@ -1129,6 +1130,32 @@ TEST_P(SequencerTestFixture, SequenceStress) { INSTANTIATE_TEST_SUITE_P(SequencerTests, SequencerTestFixture, ::testing::Values(false, true)); +class EnumeratedTestFixture : public GeneratorTestFixture {}; + +TEST_P(EnumeratedTestFixture, Empty) { + auto source = MakeEmptySource(); + auto enumerated = MakeEnumeratedGenerator(std::move(source)); + AssertGeneratorExhausted(enumerated); +} + +TEST_P(EnumeratedTestFixture, Basic) { + auto source = MakeSource({1, 2}); + auto enumerated = MakeEnumeratedGenerator(std::move(source)); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto first, enumerated()); + ASSERT_EQ(1, first.value->value); + ASSERT_EQ(0, first.index); + ASSERT_FALSE(first.last); + + ASSERT_FINISHES_OK_AND_ASSIGN(auto second, enumerated()); + ASSERT_EQ(2, second.value->value); + ASSERT_EQ(1, second.index); + ASSERT_TRUE(second.last); +} + +INSTANTIATE_TEST_SUITE_P(EnumeratedTests, EnumeratedTestFixture, + ::testing::Values(false, true)); + TEST(TestAsyncIteratorTransform, SkipSome) { auto original = AsyncVectorIt({1, 2, 3}); auto filter = MakeFilter([](TestInt& t) { return t.value != 2; }); diff --git a/cpp/src/arrow/util/future_test.cc b/cpp/src/arrow/util/future_test.cc index 213a4c7e9a4..c5a94aca81a 100644 --- a/cpp/src/arrow/util/future_test.cc +++ b/cpp/src/arrow/util/future_test.cc @@ -64,11 +64,15 @@ struct Foo { template <> struct IterationTraits { static Foo End() { return Foo(-1); } + static bool IsEnd(const Foo& val) { return val == IterationTraits::End(); } }; template <> struct IterationTraits { static MoveOnlyDataType End() { return MoveOnlyDataType(-1); } + static bool IsEnd(const MoveOnlyDataType& val) { + return val == IterationTraits::End(); + } }; template @@ -83,10 +87,10 @@ IteratorResults IteratorToResults(Iterator iterator) { while (true) { auto res = iterator.Next(); - if (res == IterationTraits::End()) { - break; - } if (res.ok()) { + if (IsIterationEnd(*res)) { + break; + } results.values.push_back(*std::move(res)); } else { results.errors.push_back(res.status()); @@ -1454,8 +1458,10 @@ class FutureTestBase : public ::testing::Test { ASSERT_OK_AND_EQ(0, it.Next()); executor_->SetFinishedDeferred({{1, true}}); ASSERT_OK_AND_EQ(1, it.Next()); - ASSERT_OK_AND_EQ(IterationTraits::End(), it.Next()); - ASSERT_OK_AND_EQ(IterationTraits::End(), it.Next()); // idempotent + EXPECT_OK_AND_ASSIGN(auto next, it.Next()); + ASSERT_TRUE(IsIterationEnd(next)); + EXPECT_OK_AND_ASSIGN(next, it.Next()); + ASSERT_TRUE(IsIterationEnd(next)); // idempotent } } diff --git a/cpp/src/arrow/util/test_common.cc b/cpp/src/arrow/util/test_common.cc index ac187ba0ce0..c7ec52601f9 100644 --- a/cpp/src/arrow/util/test_common.cc +++ b/cpp/src/arrow/util/test_common.cc @@ -47,7 +47,7 @@ std::ostream& operator<<(std::ostream& os, const TestStr& v) { } std::vector RangeVector(unsigned int max, unsigned int step) { - auto count = max / step; + unsigned int count = max / step; std::vector range(count); for (unsigned int i = 0; i < count; i++) { range[i] = i * step; diff --git a/cpp/src/arrow/util/vector.h b/cpp/src/arrow/util/vector.h index 67401d496e6..e7cc5092d09 100644 --- a/cpp/src/arrow/util/vector.h +++ b/cpp/src/arrow/util/vector.h @@ -133,5 +133,18 @@ Result> UnwrapOrRaise(std::vector>&& results) { return out; } +template +Result> UnwrapOrRaise(const std::vector>& results) { + std::vector out; + out.reserve(results.size()); + for (const auto& item : results) { + if (!item.ok()) { + return item.status(); + } + out.push_back(item.ValueUnsafe()); + } + return out; +} + } // namespace internal } // namespace arrow diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 17eedb38c7f..7ce6827753b 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -487,17 +487,19 @@ cdef class InMemoryDataset(Dataset): raise ValueError('Must provide schema to construct in-memory ' 'dataset from an empty list') table = pa.Table.from_batches(batches, schema=schema) - in_memory_dataset = make_shared[CInMemoryDataset]( - pyarrow_unwrap_table(table)) + in_memory_dataset = GetResultValue( + CInMemoryDataset.FromTable(pyarrow_unwrap_table(table))) elif isinstance(source, pa.ipc.RecordBatchReader): reader = source - in_memory_dataset = make_shared[CInMemoryDataset](reader.reader) + in_memory_dataset = GetResultValue( + CInMemoryDataset.FromReader(reader.reader)) elif _is_iterable(source): if schema is None: raise ValueError('Must provide schema to construct in-memory ' 'dataset from an iterable') reader = pa.ipc.RecordBatchReader.from_batches(schema, source) - in_memory_dataset = make_shared[CInMemoryDataset](reader.reader) + in_memory_dataset = GetResultValue( + CInMemoryDataset.FromReader(reader.reader)) else: raise TypeError( 'Expected a table, batch, iterable of tables/batches, or a ' @@ -2603,22 +2605,26 @@ cdef class Scanner(_Weakrefable): ------- scan_tasks : iterator of ScanTask """ + import warnings + warnings.warn("Scanner.scan is deprecated as of 4.0.0, " + "please use Scanner.to_batches instead.", + FutureWarning) + # Make this method eager so the warning appears immediately + return self._scan() + + def _scan(self): for maybe_task in GetResultValue(self.scanner.Scan()): yield ScanTask.wrap(GetResultValue(move(maybe_task))) def to_batches(self): """Consume a Scanner in record batches. - Sequentially executes the ScanTasks as the returned generator gets - consumed. - Returns ------- record_batches : iterator of RecordBatch """ - for task in self.scan(): - for batch in task.execute(): - yield batch + for maybe_batch in GetResultValue(self.scanner.ScanBatches()): + yield pyarrow_wrap_batch(GetResultValue(maybe_batch).record_batch) def to_table(self): """Convert a Scanner into a Table. diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index db2e73acdff..4015dba82c3 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -86,10 +86,24 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CInMemoryFragment(vector[shared_ptr[CRecordBatch]] record_batches, CExpression partition_expression) + cdef cppclass CPositionedRecordBatch \ + "arrow::dataset::PositionedRecordBatch": + shared_ptr[CRecordBatch] record_batch + shared_ptr[CFragment] fragment + int fragment_index + int scan_task_index + c_bool last_scan_task + int record_batch_index + c_bool last_record_batch + + ctypedef CIterator[CPositionedRecordBatch] CPositionedRecordBatchIterator \ + "arrow::dataset::PositionedRecordBatchIterator" + cdef cppclass CScanner "arrow::dataset::Scanner": CScanner(shared_ptr[CDataset], shared_ptr[CScanOptions]) CScanner(shared_ptr[CFragment], shared_ptr[CScanOptions]) CResult[CScanTaskIterator] Scan() + CResult[CPositionedRecordBatchIterator] ScanBatches() CResult[shared_ptr[CTable]] ToTable() CResult[CFragmentIterator] GetFragments() const shared_ptr[CScanOptions]& options() @@ -126,8 +140,13 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CInMemoryDataset "arrow::dataset::InMemoryDataset"( CDataset): - CInMemoryDataset(shared_ptr[CRecordBatchReader]) - CInMemoryDataset(shared_ptr[CTable]) + + @staticmethod + CResult[shared_ptr[CInMemoryDataset]] FromReader( + shared_ptr[CRecordBatchReader]) + + @staticmethod + CResult[shared_ptr[CInMemoryDataset]] FromTable(shared_ptr[CTable]) cdef cppclass CUnionDataset "arrow::dataset::UnionDataset"( CDataset): diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index a7dd1520168..5989e2546ab 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -362,6 +362,9 @@ def test_scanner(dataset): for batch in task.execute(): assert batch.num_columns == 1 + with pytest.warns(FutureWarning): + scanner.scan() + def test_abstract_classes(): classes = [ @@ -1699,26 +1702,27 @@ def test_construct_in_memory(): assert next(dataset.get_fragments()).to_table() == table # When constructed from readers/iterators, should be one-shot - match = "InMemoryDataset was already consumed" + match = ( + "A dataset created from a RecordBatchReader" + " can only be scanned once" + ) for factory in ( lambda: pa.ipc.RecordBatchReader.from_batches( batch.schema, [batch]), lambda: (batch for _ in range(1)), ): dataset = ds.dataset(factory(), schema=batch.schema) - # Getting fragments consumes the underlying iterator + # Getting fragments does not consume the underlying iterator fragments = list(dataset.get_fragments()) assert len(fragments) == 1 assert fragments[0].to_table() == table + fragments = list(dataset.get_fragments()) + assert len(fragments) == 1 with pytest.raises(pa.ArrowInvalid, match=match): - list(dataset.get_fragments()) - with pytest.raises(pa.ArrowInvalid, match=match): - dataset.to_table() + fragments[0].to_table() # Materializing consumes the underlying iterator dataset = ds.dataset(factory(), schema=batch.schema) assert dataset.to_table() == table - with pytest.raises(pa.ArrowInvalid, match=match): - list(dataset.get_fragments()) with pytest.raises(pa.ArrowInvalid, match=match): dataset.to_table() diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 2c7bf5c19f6..ae230f7fb62 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -504,6 +504,10 @@ dataset___Scanner__Scan <- function(scanner){ .Call(`_arrow_dataset___Scanner__Scan`, scanner) } +dataset___Scanner__ScanBatches <- function(scanner){ + .Call(`_arrow_dataset___Scanner__ScanBatches`, scanner) +} + dataset___Scanner__schema <- function(sc){ .Call(`_arrow_dataset___Scanner__schema`, sc) } diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index f7ede663c7f..f2db9b504cb 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -48,15 +48,20 @@ #' - `$schema`: Active binding, returns the [Schema] of the Dataset #' - `$Finish()`: Returns a `Scanner` #' -#' `Scanner` currently has a single method, `$ToTable()`, which evaluates the -#' query and returns an Arrow [Table]. +#' `Scanner` has the following methods: +#' `$ToTable()`: Evaluates the query and returns an Arrow [Table]. +#' `$ScanBatches()`: Evaluates the query and returns a list of Arrow [RecordBatch]. #' @rdname Scanner #' @name Scanner #' @export Scanner <- R6Class("Scanner", inherit = ArrowObject, public = list( ToTable = function() dataset___Scanner__ToTable(self), - Scan = function() dataset___Scanner__Scan(self) + Scan = function() { + .Deprecated("ScanBatches") + dataset___Scanner__Scan(self) + }, + ScanBatches = function() dataset___Scanner__ScanBatches(self) ), active = list( schema = function() dataset___Scanner__schema(self) @@ -141,18 +146,13 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { } scanner <- Scanner$create(ensure_group_vars(X)) FUN <- as_mapper(FUN) - # message("Making ScanTasks") - lapply(scanner$Scan(), function(scan_task) { - # This outer lapply could be parallelized - # message("Making Batches") - lapply(scan_task$Execute(), function(batch) { - # message("Processing Batch") - # This inner lapply cannot be parallelized - # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, - # X$temp_columns, and X$group_by_vars - # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE - FUN(batch, ...) - }) + lapply(scanner$ScanBatches(), function(batch) { + # message("Processing Batch") + # This inner lapply cannot be parallelized + # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, + # X$temp_columns, and X$group_by_vars + # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE + FUN(batch, ...) }) } diff --git a/r/R/dataset.R b/r/R/dataset.R index 3f7d117d6f6..ddf47be38b9 100644 --- a/r/R/dataset.R +++ b/r/R/dataset.R @@ -274,13 +274,10 @@ tail.Dataset <- function(x, n = 6L, ...) { result <- list() batch_num <- 0 scanner <- Scanner$create(ensure_group_vars(x)) - for (scan_task in rev(dataset___Scanner__Scan(scanner))) { - for (batch in rev(scan_task$Execute())) { - batch_num <- batch_num + 1 - result[[batch_num]] <- tail(batch, n) - n <- n - nrow(batch) - if (n <= 0) break - } + for (batch in rev(dataset___Scanner__ScanBatches(scanner))) { + batch_num <- batch_num + 1 + result[[batch_num]] <- tail(batch, n) + n <- n - nrow(batch) if (n <= 0) break } Table$create(!!!rev(result)) @@ -311,17 +308,14 @@ take_dataset_rows <- function(x, i) { result_order <- order(i) i <- sort(i) - 1L scanner <- Scanner$create(ensure_group_vars(x)) - for (scan_task in dataset___Scanner__Scan(scanner)) { - for (batch in scan_task$Execute()) { - # Take all rows that are in this batch - this_batch_nrows <- batch$num_rows - in_this_batch <- i > -1L & i < this_batch_nrows - if (any(in_this_batch)) { - result[[length(result) + 1L]] <- batch$Take(i[in_this_batch]) - } - i <- i - this_batch_nrows - if (all(i < 0L)) break + for (batch in dataset___Scanner__ScanBatches(scanner)) { + # Take all rows that are in this batch + this_batch_nrows <- batch$num_rows + in_this_batch <- i > -1L & i < this_batch_nrows + if (any(in_this_batch)) { + result[[length(result) + 1L]] <- batch$Take(i[in_this_batch]) } + i <- i - this_batch_nrows if (all(i < 0L)) break } tab <- Table$create(!!!result) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index b06a2696e50..2d1c33f25de 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1401,6 +1401,21 @@ extern "C" SEXP _arrow_dataset___Scanner__Scan(SEXP scanner_sexp){ } #endif +// dataset.cpp +#if defined(ARROW_R_WITH_DATASET) +cpp11::list dataset___Scanner__ScanBatches(const std::shared_ptr& scanner); +extern "C" SEXP _arrow_dataset___Scanner__ScanBatches(SEXP scanner_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type scanner(scanner_sexp); + return cpp11::as_sexp(dataset___Scanner__ScanBatches(scanner)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_dataset___Scanner__ScanBatches(SEXP scanner_sexp){ + Rf_error("Cannot call dataset___Scanner__ScanBatches(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___Scanner__schema(const std::shared_ptr& sc); @@ -4319,6 +4334,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__ToTable", (DL_FUNC) &_arrow_dataset___Scanner__ToTable, 1}, { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__Scan", (DL_FUNC) &_arrow_dataset___Scanner__Scan, 1}, + { "_arrow_dataset___Scanner__ScanBatches", (DL_FUNC) &_arrow_dataset___Scanner__ScanBatches, 1}, { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, { "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1}, { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 6}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 89c3e4d56d8..9c498e43356 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -98,7 +98,7 @@ std::shared_ptr dataset___UnionDataset__create( // [[dataset::export]] std::shared_ptr dataset___InMemoryDataset__create( const std::shared_ptr& table) { - return std::make_shared(table); + return ds::InMemoryDataset::FromTable(table); } // [[dataset::export]] @@ -395,6 +395,15 @@ std::shared_ptr dataset___Scanner__ToTable( return ValueOrStop(scanner->ToTable()); } +// TODO (ARROW-11782) Remove calls to Scan() +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#elif defined(_MSC_VER) +#pragma warning(push) +#pragma warning(disable : 4996) +#endif + // [[dataset::export]] std::shared_ptr dataset___Scanner__head( const std::shared_ptr& scanner, int n) { @@ -428,6 +437,24 @@ cpp11::list dataset___Scanner__Scan(const std::shared_ptr& scanner) return arrow::r::to_r_list(out); } +#if !(defined(_WIN32) || defined(__CYGWIN__)) +#pragma GCC diagnostic pop +#elif _MSC_VER +#pragma warning(pop) +#endif + +// [[dataset::export]] +cpp11::list dataset___Scanner__ScanBatches(const std::shared_ptr& scanner) { + auto it = ValueOrStop(scanner->ScanBatches()); + std::vector> out; + for (auto maybe_positioned_batch : it) { + auto positioned_batch = ValueOrStop(maybe_positioned_batch); + out.push_back(positioned_batch.record_batch); + } + + return arrow::r::to_r_list(out); +} + // [[dataset::export]] std::shared_ptr dataset___Scanner__schema( const std::shared_ptr& sc) { diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 3f2d63f89d6..d2c86f383c8 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -1209,6 +1209,21 @@ test_that("Dataset and query print methods", { ) }) +test_that("Scanner$Scan is deprecated", { + ds <- open_dataset(ipc_dir, partitioning = "part", format = "feather") + expect_deprecated( + ds$NewScan()$Finish()$Scan(), + "ScanBatches" + ) +}) + +test_that("Scanner$ScanBatches", { + ds <- open_dataset(ipc_dir, format = "feather") + batches <- ds$NewScan()$Finish()$ScanBatches() + table <- Table$create(!!!batches) + expect_equivalent(as.data.frame(table), rbind(df1, df2)) +}) + expect_scan_result <- function(ds, schm) { sb <- ds$NewScan() expect_is(sb, "ScannerBuilder")