diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 37d0da0bbc5..d80165d6f66 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -30,34 +30,41 @@ namespace arrow { namespace dataset { -Fragment::Fragment(std::shared_ptr scan_options) - : scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {} +Fragment::Fragment(std::shared_ptr partition_expression) + : partition_expression_(partition_expression ? partition_expression : scalar(true)) {} -const std::shared_ptr& Fragment::schema() const { - return scan_options_->schema(); -} +Result> InMemoryFragment::ReadPhysicalSchema() { return schema_; } -InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options) - : Fragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {} +InMemoryFragment::InMemoryFragment(std::shared_ptr schema, + RecordBatchVector record_batches, + std::shared_ptr partition_expression) + : Fragment(std::move(partition_expression)), + schema_(std::move(schema)), + record_batches_(std::move(record_batches)) {} InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options, std::shared_ptr partition_expression) - : Fragment(std::move(scan_options), std::move(partition_expression)), - record_batches_(std::move(record_batches)) {} + : InMemoryFragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(), + std::move(record_batches), std::move(partition_expression)) {} -Result InMemoryFragment::Scan(std::shared_ptr context) { +Result InMemoryFragment::Scan(std::shared_ptr options, + std::shared_ptr context) { // Make an explicit copy of record_batches_ to ensure Scan can be called // multiple times. auto batches_it = MakeVectorIterator(record_batches_); + auto batch_size = options->batch_size; // RecordBatch -> ScanTask - auto scan_options = scan_options_; auto fn = [=](std::shared_ptr batch) -> std::shared_ptr { - RecordBatchVector batches{batch}; + RecordBatchVector batches; + + auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); + for (int i = 0; i < n_batches; i++) { + batches.push_back(batch->Slice(batch_size * i, batch_size)); + } + return ::arrow::internal::make_unique( - std::move(batches), std::move(scan_options), std::move(context)); + std::move(batches), std::move(options), std::move(context)); }; return MakeMapIterator(fn, std::move(batches_it)); @@ -72,36 +79,15 @@ Result> Dataset::NewScan() { return NewScan(std::make_shared()); } -bool Dataset::AssumePartitionExpression( - const std::shared_ptr& scan_options, - std::shared_ptr* simplified_scan_options) const { - if (partition_expression_ == nullptr) { - if (simplified_scan_options != nullptr) { - *simplified_scan_options = scan_options; - } - return true; - } - - auto expr = scan_options->filter->Assume(*partition_expression_); - if (expr->IsNull() || expr->Equals(false)) { - // selector is not satisfiable; yield no fragments - return false; - } +FragmentIterator Dataset::GetFragments() { return GetFragments(scalar(true)); } - if (simplified_scan_options != nullptr) { - auto copy = std::make_shared(*scan_options); - copy->filter = std::move(expr); - *simplified_scan_options = std::move(copy); +FragmentIterator Dataset::GetFragments(std::shared_ptr predicate) { + if (partition_expression_) { + predicate = predicate->Assume(*partition_expression_); } - return true; -} -FragmentIterator Dataset::GetFragments(std::shared_ptr scan_options) { - std::shared_ptr simplified_scan_options; - if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) { - return MakeEmptyIterator>(); - } - return GetFragmentsImpl(std::move(simplified_scan_options)); + return predicate->IsSatisfiable() ? GetFragmentsImpl(std::move(predicate)) + : MakeEmptyIterator>(); } struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { @@ -141,28 +127,18 @@ Result> InMemoryDataset::ReplaceSchema( return std::make_shared(std::move(schema), get_batches_); } -FragmentIterator InMemoryDataset::GetFragmentsImpl( - std::shared_ptr scan_options) { +FragmentIterator InMemoryDataset::GetFragmentsImpl(std::shared_ptr) { auto schema = this->schema(); auto create_fragment = - [scan_options, - schema](std::shared_ptr batch) -> Result> { + [schema](std::shared_ptr batch) -> Result> { if (!batch->schema()->Equals(schema)) { return Status::TypeError("yielded batch had schema ", *batch->schema(), " which did not match InMemorySource's: ", *schema); } - RecordBatchVector batches; - - auto batch_size = scan_options->batch_size; - auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); - - for (int i = 0; i < n_batches; i++) { - batches.push_back(batch->Slice(batch_size * i, batch_size)); - } - - return std::make_shared(std::move(batches), scan_options); + RecordBatchVector batches{batch}; + return std::make_shared(std::move(batches)); }; return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get()); @@ -192,8 +168,8 @@ Result> UnionDataset::ReplaceSchema( new UnionDataset(std::move(schema), std::move(children))); } -FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr options) { - return GetFragmentsFromDatasets(children_, options); +FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr predicate) { + return GetFragmentsFromDatasets(children_, predicate); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 740263df3f8..6a82d529fb8 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -30,12 +30,23 @@ namespace arrow { namespace dataset { -/// \brief A granular piece of a Dataset, such as an individual file, which can be -/// read/scanned separately from other fragments. +/// \brief A granular piece of a Dataset, such as an individual file. /// -/// A Fragment yields a collection of RecordBatch, encapsulated in one or more ScanTasks. +/// A Fragment can be read/scanned separately from other fragments. It yields a +/// collection of RecordBatches when scanned, encapsulated in one or more +/// ScanTasks. +/// +/// Note that Fragments have well defined physical schemas which are reconciled by +/// the Datasets which contain them; these physical schemas may differ from a parent +/// Dataset's schema and the physical schemas of sibling Fragments. class ARROW_DS_EXPORT Fragment { public: + /// \brief Return the physical schema of the Fragment. + /// + /// The physical schema is also called the writer schema. + /// This method is blocking and may suffer from high latency filesystem. + virtual Result> ReadPhysicalSchema() = 0; + /// \brief Scan returns an iterator of ScanTasks, each of which yields /// RecordBatches from this Fragment. /// @@ -46,36 +57,25 @@ class ARROW_DS_EXPORT Fragment { /// columns may be absent if they were not present in this fragment. /// /// To receive a record batch stream which is fully filtered and projected, use Scanner. - virtual Result Scan(std::shared_ptr context) = 0; + virtual Result Scan(std::shared_ptr options, + std::shared_ptr context) = 0; /// \brief Return true if the fragment can benefit from parallel scanning. virtual bool splittable() const = 0; virtual std::string type_name() const = 0; - /// \brief Filtering, schema reconciliation, and partition options to use when - /// scanning this fragment. - const std::shared_ptr& scan_options() const { return scan_options_; } - - const std::shared_ptr& schema() const; - - virtual ~Fragment() = default; - /// \brief An expression which evaluates to true for all data viewed by this /// Fragment. const std::shared_ptr& partition_expression() const { return partition_expression_; } - protected: - explicit Fragment(std::shared_ptr scan_options); + virtual ~Fragment() = default; - Fragment(std::shared_ptr scan_options, - std::shared_ptr partition_expression) - : scan_options_(std::move(scan_options)), - partition_expression_(std::move(partition_expression)) {} + protected: + explicit Fragment(std::shared_ptr partition_expression = NULLPTR); - std::shared_ptr scan_options_; std::shared_ptr partition_expression_; }; @@ -83,33 +83,39 @@ class ARROW_DS_EXPORT Fragment { /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: - InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options); + InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, + std::shared_ptr = NULLPTR); + explicit InMemoryFragment(RecordBatchVector record_batches, + std::shared_ptr = NULLPTR); - InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options, - std::shared_ptr partition_expression); + Result> ReadPhysicalSchema() override; - Result Scan(std::shared_ptr context) override; + Result Scan(std::shared_ptr options, + std::shared_ptr context) override; bool splittable() const override { return false; } std::string type_name() const override { return "in-memory"; } protected: + std::shared_ptr schema_; RecordBatchVector record_batches_; }; -/// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism -/// of Fragments and partitions, e.g. files deeply nested in a directory. +/// \brief A container of zero or more Fragments. +/// +/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a +/// directory. A Dataset has a schema to which Fragments must align during a +/// scan operation. This is analogous to Avro's reader and writer schema. class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: /// \brief Begin to build a new Scan operation against this Dataset Result> NewScan(std::shared_ptr context); Result> NewScan(); - /// \brief GetFragments returns an iterator of Fragments given ScanOptions. - FragmentIterator GetFragments(std::shared_ptr options); + /// \brief GetFragments returns an iterator of Fragments given a predicate. + FragmentIterator GetFragments(std::shared_ptr predicate); + FragmentIterator GetFragments(); const std::shared_ptr& schema() const { return schema_; } @@ -138,13 +144,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { : schema_(std::move(schema)), partition_expression_(std::move(e)) {} Dataset() = default; - virtual FragmentIterator GetFragmentsImpl(std::shared_ptr options) = 0; - - /// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded - /// fragments. Returns false if the selector is not satisfiable in this Dataset. - virtual bool AssumePartitionExpression( - const std::shared_ptr& scan_options, - std::shared_ptr* simplified_scan_options) const; + virtual FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) = 0; std::shared_ptr schema_; std::shared_ptr partition_expression_; @@ -176,7 +176,7 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset { std::shared_ptr schema) const override; protected: - FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) override; std::shared_ptr get_batches_; }; @@ -200,7 +200,7 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset { std::shared_ptr schema) const override; protected: - FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) override; explicit UnionDataset(std::shared_ptr schema, DatasetVector children) : Dataset(std::move(schema)), children_(std::move(children)) {} diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 4b349cffc43..f642462e3b1 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -35,13 +35,13 @@ namespace dataset { /// \brief GetFragmentsFromDatasets transforms a vector into a /// flattened FragmentIterator. static inline FragmentIterator GetFragmentsFromDatasets( - const DatasetVector& datasets, std::shared_ptr options) { + const DatasetVector& datasets, std::shared_ptr predicate) { // Iterator auto datasets_it = MakeVectorIterator(datasets); // Dataset -> Iterator - auto fn = [options](std::shared_ptr dataset) -> FragmentIterator { - return dataset->GetFragments(options); + auto fn = [predicate](std::shared_ptr dataset) -> FragmentIterator { + return dataset->GetFragments(predicate); }; // Iterator> diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index bcaacca6b17..4be6e33e2c0 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -44,8 +44,7 @@ TEST_F(TestInMemoryFragment, Scan) { auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); // Creates a InMemoryFragment of the same repeated batch. - auto fragment = - InMemoryFragment({static_cast(kNumberBatches), batch}, options_); + auto fragment = InMemoryFragment({static_cast(kNumberBatches), batch}); AssertFragmentEquals(reader.get(), &fragment); } diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index f7a3281cb0c..92c0c3fb357 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -150,7 +150,7 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest { } options_ = ScanOptions::Make(schema); ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema)); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), paths); + AssertFragmentsAreFromPath(dataset_->GetFragments(), paths); } protected: diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index fee471d975f..a744bb4eb88 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -55,26 +55,30 @@ Result> FileSource::OpenWritable() cons return std::make_shared<::arrow::io::BufferOutputStream>(b); } -Result> FileFormat::MakeFragment( - FileSource source, std::shared_ptr options) { - return MakeFragment(std::move(source), std::move(options), scalar(true)); +Result> FileFormat::MakeFragment(FileSource source) { + return MakeFragment(std::move(source), scalar(true)); } Result> FileFormat::MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression) { + FileSource source, std::shared_ptr partition_expression) { return std::shared_ptr(new FileFragment( - std::move(source), shared_from_this(), options, std::move(partition_expression))); + std::move(source), shared_from_this(), std::move(partition_expression))); } Result> FileFormat::WriteFragment( FileSource destination, std::shared_ptr fragment, + std::shared_ptr scan_options, std::shared_ptr scan_context) { return Status::NotImplemented("writing fragment of format ", type_name()); } -Result FileFragment::Scan(std::shared_ptr context) { - return format_->ScanFile(source_, scan_options_, std::move(context)); +Result> FileFragment::ReadPhysicalSchema() { + return format_->Inspect(source_); +} + +Result FileFragment::Scan(std::shared_ptr options, + std::shared_ptr context) { + return format_->ScanFile(source_, std::move(options), std::move(context)); } FileSystemDataset::FileSystemDataset(std::shared_ptr schema, @@ -167,11 +171,9 @@ std::shared_ptr FoldingAnd(const std::shared_ptr& l, } FragmentIterator FileSystemDataset::GetFragmentsImpl( - std::shared_ptr root_options) { + std::shared_ptr predicate) { FragmentVector fragments; - std::vector> options(forest_.size()); - ExpressionVector fragment_partitions(forest_.size()); auto collect_fragments = [&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune { @@ -180,33 +182,23 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl( // if available, copy parent's filter and projector // (which are appropriately simplified and loaded with default values) if (auto parent = ref.parent()) { - options[ref.i].reset(new ScanOptions(*options[parent.i])); - fragment_partitions[ref.i] = - FoldingAnd(fragment_partitions[parent.i], partitions_[ref.i]); + fragment_partitions[ref.i] = FoldingAnd(fragment_partitions[parent.i], partition); } else { - options[ref.i].reset(new ScanOptions(*root_options)); - fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partitions_[ref.i]); + fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partition); } - // simplify filter by partition information - auto filter = options[ref.i]->filter->Assume(partition); - options[ref.i]->filter = filter; - - if (filter->IsNull() || filter->Equals(false)) { + auto simplified_predicate = predicate->Assume(partition); + if (!simplified_predicate->IsSatisfiable()) { // directories (and descendants) which can't satisfy the filter are pruned return fs::PathForest::Prune; } - // if possible, extract a partition key and pass it to the projector - RETURN_NOT_OK(KeyValuePartitioning::SetDefaultValuesFromKeys( - *partition, &options[ref.i]->projector)); - if (ref.info().IsFile()) { // generate a fragment for this file FileSource src(ref.info().path(), filesystem_.get()); - ARROW_ASSIGN_OR_RAISE(auto fragment, - format_->MakeFragment(std::move(src), options[ref.i], - std::move(fragment_partitions[ref.i]))); + ARROW_ASSIGN_OR_RAISE( + auto fragment, + format_->MakeFragment(std::move(src), std::move(fragment_partitions[ref.i]))); fragments.push_back(std::move(fragment)); } @@ -222,9 +214,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl( } Result> FileSystemDataset::Write( - const WritePlan& plan, std::shared_ptr scan_context) { - std::vector> options(plan.paths.size()); - + const WritePlan& plan, std::shared_ptr scan_options, + std::shared_ptr scan_context) { auto filesystem = plan.filesystem; if (filesystem == nullptr) { filesystem = std::make_shared(); @@ -251,9 +242,9 @@ Result> FileSystemDataset::Write( const auto& fragment = util::get>(op); FileSource dest(files[i].path(), filesystem.get()); - ARROW_ASSIGN_OR_RAISE( - auto write_task, - plan.format->WriteFragment(std::move(dest), fragment, scan_context)); + ARROW_ASSIGN_OR_RAISE(auto write_task, + plan.format->WriteFragment(std::move(dest), fragment, + scan_options, scan_context)); task_group->Append([write_task] { return write_task->Execute(); }); } diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index e6d893193ff..5a60dfd6f83 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -144,23 +144,25 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression); + FileSource source, std::shared_ptr partition_expression); - Result> MakeFragment( - FileSource source, std::shared_ptr options); + Result> MakeFragment(FileSource source); /// \brief Write a fragment. If the parent directory of destination does not exist, it /// will be created. virtual Result> WriteFragment( FileSource destination, std::shared_ptr fragment, + std::shared_ptr options, std::shared_ptr scan_context); // FIXME(bkietz) make this pure virtual }; /// \brief A Fragment that is stored in a file with a known format class ARROW_DS_EXPORT FileFragment : public Fragment { public: - Result Scan(std::shared_ptr context) override; + Result> ReadPhysicalSchema() override; + + Result Scan(std::shared_ptr options, + std::shared_ptr context) override; std::string type_name() const override { return format_->type_name(); } bool splittable() const override { return format_->splittable(); } @@ -170,9 +172,8 @@ class ARROW_DS_EXPORT FileFragment : public Fragment { protected: FileFragment(FileSource source, std::shared_ptr format, - std::shared_ptr scan_options, std::shared_ptr partition_expression) - : Fragment(std::move(scan_options), std::move(partition_expression)), + : Fragment(std::move(partition_expression)), source_(std::move(source)), format_(std::move(format)) {} @@ -240,7 +241,8 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { /// \param[in] plan the WritePlan to execute. /// \param[in] scan_context context in which to scan fragments before writing. static Result> Write( - const WritePlan& plan, std::shared_ptr scan_context); + const WritePlan& plan, std::shared_ptr scan_options, + std::shared_ptr scan_context); std::string type_name() const override { return "filesystem"; } @@ -256,7 +258,7 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { std::string ToString() const; protected: - FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) override; FileSystemDataset(std::shared_ptr schema, std::shared_ptr root_partition, diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 5d1b531a0a9..0ce312eaed5 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -158,44 +158,54 @@ Result IpcFileFormat::ScanFile( return IpcScanTaskIterator::Make(options, context, source); } -Result> IpcFileFormat::WriteFragment( - FileSource destination, std::shared_ptr fragment, - std::shared_ptr scan_context) { - struct Task : WriteTask { - Task(FileSource destination, std::shared_ptr format, - std::shared_ptr fragment, std::shared_ptr scan_context) - : WriteTask(std::move(destination), std::move(format)), - fragment_(std::move(fragment)), - scan_context_(std::move(scan_context)) {} +class IpcWriteTask : public WriteTask { + public: + IpcWriteTask(FileSource destination, std::shared_ptr format, + std::shared_ptr fragment, + std::shared_ptr scan_options, + std::shared_ptr scan_context) + : WriteTask(std::move(destination), std::move(format)), + fragment_(std::move(fragment)), + scan_options_(std::move(scan_options)), + scan_context_(std::move(scan_context)) {} - Status Execute() override { - RETURN_NOT_OK(CreateDestinationParentDir()); + Status Execute() override { + RETURN_NOT_OK(CreateDestinationParentDir()); - ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable()); - ARROW_ASSIGN_OR_RAISE(auto writer, - ipc::NewFileWriter(out_stream.get(), fragment_->schema())); - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment_->Scan(scan_context_)); + auto schema = scan_options_->schema(); - for (auto maybe_scan_task : scan_task_it) { - ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); + ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable()); + ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewFileWriter(out_stream.get(), schema)); + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, + fragment_->Scan(scan_options_, scan_context_)); - ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); + for (auto maybe_scan_task : scan_task_it) { + ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); - for (auto maybe_batch : batch_it) { - ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); - RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); - } - } + ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); - return writer->Close(); + for (auto maybe_batch : batch_it) { + ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + } } - std::shared_ptr fragment_; - std::shared_ptr scan_context_; - }; + return writer->Close(); + } + + private: + std::shared_ptr fragment_; + std::shared_ptr scan_options_; + std::shared_ptr scan_context_; +}; - return std::make_shared(std::move(destination), shared_from_this(), - std::move(fragment), std::move(scan_context)); +Result> IpcFileFormat::WriteFragment( + FileSource destination, std::shared_ptr fragment, + std::shared_ptr scan_options, + std::shared_ptr scan_context) { + return std::make_shared(std::move(destination), shared_from_this(), + std::move(fragment), std::move(scan_options), + std::move(scan_context)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 57c98f54d24..6eef2ac8ed2 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -47,6 +47,7 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { Result> WriteFragment( FileSource destination, std::shared_ptr fragment, + std::shared_ptr options, std::shared_ptr context) override; }; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 8b1d88491d7..1247eb109e7 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -99,7 +99,7 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin { } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); return Batches(std::move(scan_task_it)); } @@ -115,7 +115,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -132,11 +132,12 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); - EXPECT_OK_AND_ASSIGN(auto write_task, format_->WriteFragment(sink, fragment, ctx_)); + EXPECT_OK_AND_ASSIGN(auto write_task, + format_->WriteFragment(sink, fragment, opts_, ctx_)); ASSERT_OK(write_task->Execute()); @@ -187,13 +188,13 @@ TEST_F(TestIpcFileSystemDataset, Write) { auto partitioning_factory = DirectoryPartitioning::MakeFactory({"str", "i32"}); ASSERT_OK_AND_ASSIGN( - auto plan, partitioning_factory->MakeWritePlan(dataset_->GetFragments(options_))); + auto plan, partitioning_factory->MakeWritePlan(schema, dataset_->GetFragments())); plan.format = format_; plan.filesystem = fs_; plan.partition_base_dir = "new_root/"; - ASSERT_OK_AND_ASSIGN(auto written, FileSystemDataset::Write(plan, ctx_)); + ASSERT_OK_AND_ASSIGN(auto written, FileSystemDataset::Write(plan, opts_, ctx_)); using E = TestExpression; std::vector actual_partitions; @@ -249,7 +250,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -282,7 +283,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) { auto readers = {reader.get(), reader_without_i32.get(), reader_without_f64.get()}; for (auto reader : readers) { auto source = GetFileSource(reader); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); // NB: projector is applied by the scanner; Fragment does not evaluate it. // We will not drop "i32" even though it is not in the projector's schema. diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 107f9c42f1e..0c895784ff3 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -220,8 +220,7 @@ class RowGroupSkipper { } auto stats_expr = maybe_stats_expr.ValueOrDie(); - auto expr = filter_->Assume(stats_expr); - return (expr->IsNull() || expr->Equals(false)); + return !filter_->Assume(stats_expr)->IsSatisfiable(); } std::shared_ptr metadata_; @@ -402,23 +401,21 @@ Result ParquetFileFormat::ScanFile( } Result> ParquetFileFormat::MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression, std::vector row_groups) { + FileSource source, std::shared_ptr partition_expression, + std::vector row_groups) { return std::shared_ptr( - new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options), + new ParquetFileFragment(std::move(source), shared_from_this(), std::move(partition_expression), std::move(row_groups))); } Result> ParquetFileFormat::MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression) { - return std::shared_ptr( - new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options), - std::move(partition_expression), {})); + FileSource source, std::shared_ptr partition_expression) { + return std::shared_ptr(new ParquetFileFragment( + std::move(source), shared_from_this(), std::move(partition_expression), {})); } Result ParquetFileFormat::GetRowGroupFragments( - const ParquetFileFragment& fragment, std::shared_ptr extra_filter) { + const ParquetFileFragment& fragment, std::shared_ptr filter) { auto properties = MakeReaderProperties(*this); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(fragment.source(), std::move(properties))); @@ -433,26 +430,22 @@ Result ParquetFileFormat::GetRowGroupFragments( } FragmentVector fragments(row_groups.size()); - auto new_options = std::make_shared(*fragment.scan_options()); - if (!extra_filter->Equals(true)) { - new_options->filter = and_(std::move(extra_filter), std::move(new_options->filter)); - } - RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), - new_options->filter, std::move(row_groups)); + std::move(filter), std::move(row_groups)); for (int i = 0, row_group = skipper.Next(); row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) { - ARROW_ASSIGN_OR_RAISE(fragments[i++], - MakeFragment(fragment.source(), new_options, - fragment.partition_expression(), {row_group})); + ARROW_ASSIGN_OR_RAISE( + fragments[i++], + MakeFragment(fragment.source(), fragment.partition_expression(), {row_group})); } return MakeVectorIterator(std::move(fragments)); } -Result ParquetFileFragment::Scan(std::shared_ptr context) { - return parquet_format().ScanFile(source_, scan_options_, std::move(context), +Result ParquetFileFragment::Scan(std::shared_ptr options, + std::shared_ptr context) { + return parquet_format().ScanFile(source_, std::move(options), std::move(context), row_groups_); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index a1ab047c8d1..6322e6d39a8 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -100,25 +100,29 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { using FileFormat::MakeFragment; Result> MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression) override; + FileSource source, std::shared_ptr partition_expression) override; /// \brief Create a Fragment, restricted to the specified row groups. Result> MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression, std::vector row_groups); + FileSource source, std::shared_ptr partition_expression, + std::vector row_groups); /// \brief Split a ParquetFileFragment into a Fragment for each row group. - /// Row groups whose metadata contradicts the fragment's filter or the extra_filter - /// will be excluded. + /// + /// \param[in] fragment to split + /// \param[in] filter expression that will ignore RowGroup that can't satisfy + /// the filter. + /// + /// \return An iterator of fragment. Result GetRowGroupFragments( const ParquetFileFragment& fragment, - std::shared_ptr extra_filter = scalar(true)); + std::shared_ptr filter = scalar(true)); }; class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { public: - Result Scan(std::shared_ptr context) override; + Result Scan(std::shared_ptr options, + std::shared_ptr context) override; /// \brief The row groups viewed by this Fragment. This may be empty which signifies all /// row groups are selected. @@ -126,10 +130,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { private: ParquetFileFragment(FileSource source, std::shared_ptr format, - std::shared_ptr scan_options, std::shared_ptr partition_expression, std::vector row_groups) - : FileFragment(std::move(source), std::move(format), std::move(scan_options), + : FileFragment(std::move(source), std::move(format), std::move(partition_expression)), row_groups_(std::move(row_groups)) {} diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 9a7c8fbfb45..ee65e50382c 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -149,7 +149,7 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); return Batches(std::move(scan_task_it)); } @@ -181,14 +181,10 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { void CountRowGroupsInFragment(const std::shared_ptr& fragment, std::vector expected_row_groups, - const Expression& filter, - const Expression& extra_filter = *scalar(true)) { - fragment->scan_options()->filter = filter.Copy(); - + const Expression& filter) { auto parquet_fragment = checked_pointer_cast(fragment); - ASSERT_OK_AND_ASSIGN( - auto row_group_fragments, - format_->GetRowGroupFragments(*parquet_fragment, extra_filter.Copy())); + ASSERT_OK_AND_ASSIGN(auto row_group_fragments, + format_->GetRowGroupFragments(*parquet_fragment, filter.Copy())); auto expected_row_group = expected_row_groups.begin(); for (auto maybe_fragment : row_group_fragments) { @@ -214,7 +210,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -235,9 +231,9 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { opts_ = ScanOptions::Make(reader->schema()); format_->reader_options.dict_columns = {"utf8"}; - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); int64_t row_count = 0; Schema expected_schema({field("utf8", dictionary(int32(), utf8()))}); @@ -283,7 +279,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -316,7 +312,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { auto readers = {reader.get(), reader_without_i32.get(), reader_without_f64.get()}; for (auto reader : readers) { auto source = GetFileSource(reader); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); // NB: projector is applied by the scanner; ParquetFragment does not evaluate it. // We will not drop "i32" even though it is not in the projector's schema. @@ -404,7 +400,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); opts_->filter = scalar(true); CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups); @@ -444,7 +440,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); CountRowGroupsInFragment(fragment, internal::Iota(static_cast(kTotalNumRows)), *scalar(true)); @@ -460,7 +456,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { // No rows match 1 and 2. CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(1) and "u8"_ == uint8_t(2)); - CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(2), "i64"_ == int64_t(4)); + CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(2) and "i64"_ == int64_t(4)); CountRowGroupsInFragment(fragment, {1, 3}, "i64"_ == int64_t(2) or "i64"_ == int64_t(4)); @@ -470,8 +466,8 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast(kNumRowGroups)), "i64"_ >= int64_t(6)); - CountRowGroupsInFragment(fragment, {5, 6, 7}, "i64"_ >= int64_t(6), - "i64"_ < int64_t(8)); + CountRowGroupsInFragment(fragment, {5, 6, 7}, + "i64"_ >= int64_t(6) and "i64"_ < int64_t(8)); } TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { @@ -485,7 +481,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { auto row_groups_fragment = [&](std::vector row_groups) { EXPECT_OK_AND_ASSIGN(auto fragment, - format_->MakeFragment(*source, opts_, scalar(true), row_groups)); + format_->MakeFragment(*source, scalar(true), row_groups)); return internal::checked_pointer_cast(fragment); }; @@ -517,7 +513,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { EXPECT_RAISES_WITH_MESSAGE_THAT( IndexError, testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"), - row_groups_fragment({kNumRowGroups + 1})->Scan(ctx_)); + row_groups_fragment({kNumRowGroups + 1})->Scan(opts_, ctx_)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 699ba989587..ee2bdc5cee2 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -80,15 +80,15 @@ TEST(FileSource, BufferBased) { TEST_F(TestFileSystemDataset, Basic) { MakeDataset({}); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {}); MakeDataset({fs::File("a"), fs::File("b"), fs::File("c")}); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b", "c"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {"a", "b", "c"}); AssertFilesAre(dataset_, {"a", "b", "c"}); // Should not create fragment from directories. MakeDataset({fs::Dir("A"), fs::Dir("A/B"), fs::File("A/a"), fs::File("A/B/b")}); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"A/a", "A/B/b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {"A/a", "A/B/b"}); AssertFilesAre(dataset_, {"A/a", "A/B/b"}); } @@ -121,27 +121,22 @@ TEST_F(TestFileSystemDataset, RootPartitionPruning) { MakeDataset({fs::File("a"), fs::File("b")}, root_partition); // Default filter should always return all data. - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {"a", "b"}); // filter == partition - options_->filter = root_partition; - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(root_partition), {"a", "b"}); // Same partition key, but non matching filter - options_->filter = ("a"_ == 6).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("a"_ == 6).Copy()), {}); - options_->filter = ("a"_ > 1).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("a"_ > 1).Copy()), {"a", "b"}); // different key shouldn't prune - options_->filter = ("b"_ == 6).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("b"_ == 6).Copy()), {"a", "b"}); // No partition should match MakeDataset({fs::File("a"), fs::File("b")}); - options_->filter = ("b"_ == 6).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("b"_ == 6).Copy()), {"a", "b"}); } TEST_F(TestFileSystemDataset, TreePartitionPruning) { @@ -165,21 +160,20 @@ TEST_F(TestFileSystemDataset, TreePartitionPruning) { std::vector franklins = {"CA/Franklin", "NY/Franklin"}; // Default filter should always return all data. - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), all_cities); + AssertFragmentsAreFromPath(dataset_->GetFragments(), all_cities); // Dataset's partitions are respected - options_->filter = ("country"_ == "US").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), all_cities); - options_->filter = ("country"_ == "FR").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("country"_ == "US").Copy()), + all_cities); + AssertFragmentsAreFromPath(dataset_->GetFragments(("country"_ == "FR").Copy()), {}); - options_->filter = ("state"_ == "CA").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), ca_cities); + AssertFragmentsAreFromPath(dataset_->GetFragments(("state"_ == "CA").Copy()), + ca_cities); // Filter where no decisions can be made on inner nodes when filter don't // apply to inner partitions. - options_->filter = ("city"_ == "Franklin").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), franklins); + AssertFragmentsAreFromPath(dataset_->GetFragments(("city"_ == "Franklin").Copy()), + franklins); } TEST_F(TestFileSystemDataset, FragmentPartitions) { @@ -202,7 +196,7 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) { }; AssertFragmentsHavePartitionExpressions( - dataset_->GetFragments(options_), + dataset_->GetFragments(), { with_root("state"_ == "CA", "city"_ == "San Francisco"), with_root("state"_ == "CA", "city"_ == "Franklin"), diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc index d81501d7e60..6488c0dabf0 100644 --- a/cpp/src/arrow/dataset/filter.cc +++ b/cpp/src/arrow/dataset/filter.cc @@ -49,8 +49,8 @@ namespace arrow { namespace dataset { using arrow::compute::Datum; -using internal::checked_cast; -using internal::checked_pointer_cast; +using arrow::internal::checked_cast; +using arrow::internal::checked_pointer_cast; inline std::shared_ptr NullExpression() { return std::make_shared(std::make_shared()); @@ -655,31 +655,32 @@ std::string ScalarExpression::ToString() const { return value_->ToString() + ":" + type_repr; } +using arrow::internal::JoinStrings; + std::string AndExpression::ToString() const { - return internal::JoinStrings( + return JoinStrings( {"(", left_operand_->ToString(), " and ", right_operand_->ToString(), ")"}, ""); } std::string OrExpression::ToString() const { - return internal::JoinStrings( + return JoinStrings( {"(", left_operand_->ToString(), " or ", right_operand_->ToString(), ")"}, ""); } std::string NotExpression::ToString() const { if (operand_->type() == ExpressionType::IS_VALID) { const auto& is_valid = checked_cast(*operand_); - return internal::JoinStrings({"(", is_valid.operand()->ToString(), " is null)"}, ""); + return JoinStrings({"(", is_valid.operand()->ToString(), " is null)"}, ""); } - return internal::JoinStrings({"(not ", operand_->ToString(), ")"}, ""); + return JoinStrings({"(not ", operand_->ToString(), ")"}, ""); } std::string IsValidExpression::ToString() const { - return internal::JoinStrings({"(", operand_->ToString(), " is not null)"}, ""); + return JoinStrings({"(", operand_->ToString(), " is not null)"}, ""); } std::string InExpression::ToString() const { - return internal::JoinStrings( - {"(", operand_->ToString(), " is in ", set_->ToString(), ")"}, ""); + return JoinStrings({"(", operand_->ToString(), " is in ", set_->ToString(), ")"}, ""); } std::string CastExpression::ToString() const { @@ -691,13 +692,13 @@ std::string CastExpression::ToString() const { auto like = arrow::util::get>(to_); to = " like " + like->ToString(); } - return internal::JoinStrings({"(cast ", operand_->ToString(), std::move(to), ")"}, ""); + return JoinStrings({"(cast ", operand_->ToString(), std::move(to), ")"}, ""); } std::string ComparisonExpression::ToString() const { - return internal::JoinStrings({"(", left_operand_->ToString(), " ", OperatorName(op()), - " ", right_operand_->ToString(), ")"}, - ""); + return JoinStrings({"(", left_operand_->ToString(), " ", OperatorName(op()), " ", + right_operand_->ToString(), ")"}, + ""); } bool UnaryExpression::Equals(const Expression& other) const { diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index e33b904bbfc..26f37b008fe 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -183,6 +183,11 @@ class ARROW_DS_EXPORT Expression { return Assume(*given); } + /// Indicates if the expression is satisfiable. + /// + /// This is a shortcut to check if the expression is neither null nor false. + bool IsSatisfiable() const { return !IsNull() && !Equals(false); } + /// returns a debug string representing this expression virtual std::string ToString() const = 0; diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 676eb752aa4..c72ae541e00 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -42,7 +42,7 @@ namespace dataset { using util::string_view; -using internal::checked_cast; +using arrow::internal::checked_cast; Result> Partitioning::Parse(const std::string& path) const { ExpressionVector expressions; @@ -64,13 +64,15 @@ std::shared_ptr Partitioning::Default() { return std::make_shared(); } -Result PartitioningFactory::MakeWritePlan(FragmentIterator fragment_it) { +Result PartitioningFactory::MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragment_it) { return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ", type_name()); } Result PartitioningFactory::MakeWritePlan( - FragmentIterator fragment_it, const std::shared_ptr& schema) { + std::shared_ptr schema, FragmentIterator fragment_it, + std::shared_ptr partition_schema) { return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ", type_name()); } @@ -297,10 +299,12 @@ class DirectoryPartitioningFactory : public PartitioningFactory { struct MakeWritePlanImpl; - Result MakeWritePlan(FragmentIterator fragments) override; + Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments) override; - Result MakeWritePlan(FragmentIterator fragments, - const std::shared_ptr& schema) override; + Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments, + std::shared_ptr partition_schema) override; private: std::vector field_names_; @@ -309,9 +313,10 @@ class DirectoryPartitioningFactory : public PartitioningFactory { struct DirectoryPartitioningFactory::MakeWritePlanImpl { using Indices = std::basic_string; - MakeWritePlanImpl(DirectoryPartitioningFactory* factory, + MakeWritePlanImpl(DirectoryPartitioningFactory* factory, std::shared_ptr schema, FragmentVector source_fragments) : this_(factory), + schema_(std::move(schema)), source_fragments_(std::move(source_fragments)), right_hand_sides_(source_fragments_.size(), Indices(num_fields(), -1)) {} @@ -416,23 +421,6 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { return std::to_string(milliseconds_since_epoch); } - // remove fields which will be implicit in the partitioning; writing them to files would - // be redundant - Status DropPartitionFields(const std::shared_ptr& partitioning, - Fragment* fragment) { - auto schema = fragment->schema(); - for (const auto& field : partitioning->schema()->fields()) { - int field_i = schema->GetFieldIndex(field->name()); - if (field_i != -1) { - ARROW_ASSIGN_OR_RAISE(schema, schema->RemoveField(field_i)); - } - } - - // the fragment being scanned to disk will now deselect redundant columns - fragment->scan_options()->projector = RecordBatchProjector(std::move(schema)); - return Status::OK(); - } - Result Finish(std::shared_ptr partitioning_schema = nullptr) && { WritePlan out; @@ -444,18 +432,17 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { ARROW_ASSIGN_OR_RAISE(out.partitioning, this_->Finish(std::move(partitioning_schema))); - auto fragment_schema = - source_fragments_.empty() ? schema({}) : source_fragments_.front()->schema(); + // There's no guarantee that all Fragments have the same schema. ARROW_ASSIGN_OR_RAISE(out.schema, - UnifySchemas({out.partitioning->schema(), fragment_schema})); + UnifySchemas({out.partitioning->schema(), schema_})); // Lexicographic ordering WRT right_hand_sides_ ensures that source_fragments_ are in // a depth first visitation order WRT their partition expressions. This makes // generation of the full directory tree far simpler since a directory's files are // grouped. - auto permutation = internal::ArgSort(right_hand_sides_); - internal::Permute(permutation, &source_fragments_); - internal::Permute(permutation, &right_hand_sides_); + auto permutation = arrow::internal::ArgSort(right_hand_sides_); + arrow::internal::Permute(permutation, &source_fragments_); + arrow::internal::Permute(permutation, &right_hand_sides_); // the basename of out.paths[i] is stored in segments[i] (full paths will be assembled // after segments is complete) @@ -473,9 +460,6 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { Indices current_parents(num_fields() + 1, -1); for (size_t fragment_i = 0; fragment_i < source_fragments_.size(); ++fragment_i) { - RETURN_NOT_OK( - DropPartitionFields(out.partitioning, source_fragments_[fragment_i].get())); - int field_i = 0; for (; field_i < num_fields(); ++field_i) { // these directories have already been created and we're still writing their @@ -528,6 +512,7 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { } DirectoryPartitioningFactory* this_; + std::shared_ptr schema_; FragmentVector source_fragments_; struct { @@ -552,15 +537,16 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { }; Result DirectoryPartitioningFactory::MakeWritePlan( - FragmentIterator fragment_it, const std::shared_ptr& schema) { + std::shared_ptr schema, FragmentIterator fragment_it, + std::shared_ptr partition_schema) { ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector()); - return MakeWritePlanImpl(this, std::move(fragments)).Finish(schema); + return MakeWritePlanImpl(this, schema, std::move(fragments)).Finish(partition_schema); } Result DirectoryPartitioningFactory::MakeWritePlan( - FragmentIterator fragment_it) { + std::shared_ptr schema, FragmentIterator fragment_it) { ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector()); - return MakeWritePlanImpl(this, std::move(fragments)).Finish(); + return MakeWritePlanImpl(this, schema, std::move(fragments)).Finish(); } std::shared_ptr DirectoryPartitioning::MakeFactory( diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 463d06cb40a..12edd75800c 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -104,10 +104,12 @@ class ARROW_DS_EXPORT PartitioningFactory { // FIXME(bkietz) Make these pure virtual /// Construct a WritePlan for the provided fragments - virtual Result MakeWritePlan(FragmentIterator fragments, - const std::shared_ptr& schema); + virtual Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments, + std::shared_ptr partition_schema); /// Construct a WritePlan for the provided fragments, inferring schema - virtual Result MakeWritePlan(FragmentIterator fragments); + virtual Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments); }; /// \brief Subclass for representing the default, a partitioning that returns diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index ec9e826afc2..a77c197d91c 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -304,8 +304,7 @@ class TestPartitioningWritePlan : public ::testing::Test { FragmentIterator MakeFragments(const ExpressionVector& partition_expressions) { fragments_.clear(); for (const auto& expr : partition_expressions) { - fragments_.emplace_back( - new InMemoryFragment(RecordBatchVector{}, scan_options_, expr)); + fragments_.emplace_back(new InMemoryFragment(RecordBatchVector{}, expr)); } return MakeVectorIterator(fragments_); } @@ -321,27 +320,30 @@ class TestPartitioningWritePlan : public ::testing::Test { template void MakeWritePlan(const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - EXPECT_OK_AND_ASSIGN(plan_, factory_->MakeWritePlan(std::move(fragments))); + EXPECT_OK_AND_ASSIGN(plan_, + factory_->MakeWritePlan(schema({}), std::move(fragments))); } template Status MakeWritePlanError(const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - return factory_->MakeWritePlan(std::move(fragments)).status(); + return factory_->MakeWritePlan(schema({}), std::move(fragments)).status(); } template - void MakeWritePlanWithSchema(const std::shared_ptr& schema, + void MakeWritePlanWithSchema(const std::shared_ptr& partition_schema, const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - EXPECT_OK_AND_ASSIGN(plan_, factory_->MakeWritePlan(std::move(fragments), schema)); + EXPECT_OK_AND_ASSIGN(plan_, factory_->MakeWritePlan(schema({}), std::move(fragments), + partition_schema)); } template - Status MakeWritePlanWithSchemaError(const std::shared_ptr& schema, + Status MakeWritePlanWithSchemaError(const std::shared_ptr& partition_schema, const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - return factory_->MakeWritePlan(std::move(fragments), schema).status(); + return factory_->MakeWritePlan(schema({}), std::move(fragments), partition_schema) + .status(); } struct ExpectedWritePlan { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index a78450ba219..9f017678260 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -73,21 +73,14 @@ FragmentIterator Scanner::GetFragments() { // Transform Datasets in a flat Iterator. This // iterator is lazily constructed, i.e. Dataset::GetFragments is // not invoked until a Fragment is requested. - return GetFragmentsFromDatasets({dataset_}, scan_options_); + return GetFragmentsFromDatasets({dataset_}, scan_options_->filter); } Result Scanner::Scan() { // Transforms Iterator into a unified // Iterator. The first Iterator::Next invocation is going to do // all the work of unwinding the chained iterators. - auto scan_task_it = GetScanTaskIterator(GetFragments(), scan_context_); - - // Apply the filter and/or projection to incoming RecordBatches by - // wrapping the ScanTask with a FilterAndProjectScanTask - auto wrap_scan_task = [](std::shared_ptr task) -> std::shared_ptr { - return std::make_shared(std::move(task)); - }; - return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); + return GetScanTaskIterator(GetFragments(), scan_options_, scan_context_); } Result ScanTaskIteratorFromRecordBatch( @@ -101,9 +94,18 @@ Result ScanTaskIteratorFromRecordBatch( ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, std::shared_ptr scan_context) : dataset_(std::move(dataset)), + fragment_(nullptr), scan_options_(ScanOptions::Make(dataset_->schema())), scan_context_(std::move(scan_context)) {} +ScannerBuilder::ScannerBuilder(std::shared_ptr schema, + std::shared_ptr fragment, + std::shared_ptr scan_context) + : dataset_(nullptr), + fragment_(std::move(fragment)), + scan_options_(ScanOptions::Make(schema)), + scan_context_(std::move(scan_context)) {} + Status ScannerBuilder::Project(std::vector columns) { RETURN_NOT_OK(schema()->CanReferenceFieldsByNames(columns)); has_projection_ = true; @@ -146,15 +148,21 @@ Result> ScannerBuilder::Finish() const { scan_options->evaluator = std::make_shared(); } + if (dataset_ == nullptr) { + return std::make_shared(fragment_, std::move(scan_options), scan_context_); + } + return std::make_shared(dataset_, std::move(scan_options), scan_context_); } -std::shared_ptr ScanContext::TaskGroup() const { +using arrow::internal::TaskGroup; + +std::shared_ptr ScanContext::TaskGroup() const { if (use_threads) { - internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool(); - return internal::TaskGroup::MakeThreaded(thread_pool); + auto* thread_pool = arrow::internal::GetCpuThreadPool(); + return TaskGroup::MakeThreaded(thread_pool); } - return internal::TaskGroup::MakeSerial(); + return TaskGroup::MakeSerial(); } Result> Scanner::ToTable() { diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index e94efd4874a..238b8d0c490 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -143,13 +143,12 @@ ARROW_DS_EXPORT Result ScanTaskIteratorFromRecordBatch( /// \brief Scanner is a materialized scan operation with context and options /// bound. A scanner is the class that glues ScanTask, Fragment, -/// and Source. In python pseudo code, it performs the following: +/// and Dataset. In python pseudo code, it performs the following: /// /// def Scan(): -/// for source in this.sources_: -/// for fragment in source.GetFragments(this.options_): -/// for scan_task in fragment.Scan(this.context_): -/// yield scan_task +/// for fragment in self.dataset.GetFragments(this.options.filter): +/// for scan_task in fragment.Scan(this.options): +/// yield scan_task class ARROW_DS_EXPORT Scanner { public: Scanner(std::shared_ptr dataset, std::shared_ptr scan_options, @@ -158,9 +157,10 @@ class ARROW_DS_EXPORT Scanner { scan_options_(std::move(scan_options)), scan_context_(std::move(scan_context)) {} - Scanner(std::shared_ptr fragment, std::shared_ptr scan_context) + Scanner(std::shared_ptr fragment, std::shared_ptr scan_options, + std::shared_ptr scan_context) : fragment_(std::move(fragment)), - scan_options_(fragment_->scan_options()), + scan_options_(std::move(scan_options)), scan_context_(std::move(scan_context)) {} /// \brief The Scan operator returns a stream of ScanTask. The caller is @@ -199,6 +199,9 @@ class ARROW_DS_EXPORT ScannerBuilder { ScannerBuilder(std::shared_ptr dataset, std::shared_ptr scan_context); + ScannerBuilder(std::shared_ptr schema, std::shared_ptr fragment, + std::shared_ptr scan_context); + /// \brief Set the subset of columns to materialize. /// /// This subset will be passed down to Sources and corresponding Fragments. @@ -240,10 +243,11 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Return the constructed now-immutable Scanner object Result> Finish() const; - std::shared_ptr schema() const { return dataset_->schema(); } + std::shared_ptr schema() const { return scan_options_->schema(); } private: std::shared_ptr dataset_; + std::shared_ptr fragment_; std::shared_ptr scan_options_; std::shared_ptr scan_context_; bool has_projection_ = false; diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 0cbd4a55d2a..0aad0d63216 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -22,6 +22,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/filter.h" +#include "arrow/dataset/partition.h" #include "arrow/dataset/scanner.h" namespace arrow { @@ -55,28 +56,55 @@ inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, class FilterAndProjectScanTask : public ScanTask { public: - explicit FilterAndProjectScanTask(std::shared_ptr task) - : ScanTask(task->options(), task->context()), task_(std::move(task)) {} + explicit FilterAndProjectScanTask(std::shared_ptr task, + std::shared_ptr partition) + : ScanTask(task->options(), task->context()), + task_(std::move(task)), + partition_(std::move(partition)), + filter_(NULLPTR), + projector_(options()->projector) {} Result Execute() override { ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); - auto filter_it = FilterRecordBatch(std::move(it), *options_->evaluator, - *options_->filter, context_->pool); - return ProjectRecordBatch(std::move(filter_it), &task_->options()->projector, - context_->pool); + + filter_ = options()->filter->Assume(partition_); + auto filter_it = + FilterRecordBatch(std::move(it), *options_->evaluator, *filter_, context_->pool); + + if (partition_) { + RETURN_NOT_OK( + KeyValuePartitioning::SetDefaultValuesFromKeys(*partition_, &projector_)); + } + return ProjectRecordBatch(std::move(filter_it), &projector_, context_->pool); } private: std::shared_ptr task_; + std::shared_ptr partition_; + std::shared_ptr filter_; + RecordBatchProjector projector_; }; /// \brief GetScanTaskIterator transforms an Iterator in a /// flattened Iterator. inline ScanTaskIterator GetScanTaskIterator(FragmentIterator fragments, + std::shared_ptr options, std::shared_ptr context) { // Fragment -> ScanTaskIterator - auto fn = [context](std::shared_ptr fragment) { - return fragment->Scan(context); + auto fn = [options, + context](std::shared_ptr fragment) -> Result { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options, context)); + + auto partition = fragment->partition_expression(); + // Apply the filter and/or projection to incoming RecordBatches by + // wrapping the ScanTask with a FilterAndProjectScanTask + auto wrap_scan_task = + [partition](std::shared_ptr task) -> std::shared_ptr { + return std::make_shared(std::move(task), + std::move(partition)); + }; + + return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); }; // Iterator> diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index b7feaef2b46..8ca055bf3b4 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -119,7 +119,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(ctx_)); + ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_, ctx_)); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); @@ -135,7 +135,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragments of a dataset. void AssertDatasetFragmentsEqual(RecordBatchReader* expected, Dataset* dataset, bool ensure_drained = true) { - auto it = dataset->GetFragments(options_); + auto it = dataset->GetFragments(options_->filter); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 510491319f0..8477d5e1722 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -104,27 +104,44 @@ cdef class Dataset: self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))) return Dataset.wrap(move(copy)) - def get_fragments(self, columns=None, filter=None): + def get_fragments(self, Expression filter=None): """Returns an iterator over the fragments in this dataset. Parameters ---------- - columns : list of str, default None - List of columns to project. filter : Expression, default None - Scan will return only the rows matching the filter. + Return fragments matching the optional filter, either using the + partition_expression or internal information like Parquet's + statistics. Returns ------- fragments : iterator of Fragment """ - return Scanner(self, columns=columns, filter=filter).get_fragments() + cdef: + CFragmentIterator iterator + shared_ptr[CFragment] fragment + + if filter is None or filter.expr == nullptr: + iterator = self.dataset.GetFragments() + else: + iterator = self.dataset.GetFragments(filter.unwrap()) - def scan(self, columns=None, filter=None, MemoryPool memory_pool=None): + while True: + fragment = GetResultValue(iterator.Next()) + if fragment.get() == nullptr: + raise StopIteration() + else: + yield Fragment.wrap(fragment) + + def _scanner(self, **kwargs): + return Scanner.from_dataset(self, **kwargs) + + def scan(self, **kwargs): """Builds a scan operation against the dataset. - It poduces a stream of ScanTasks which is meant to be a unit of work to - be dispatched. The tasks are not executed automatically, the user is + It produces a stream of ScanTasks which is meant to be a unit of work + to be dispatched. The tasks are not executed automatically, the user is responsible to execute and dispatch the individual tasks, so custom local task scheduling can be implemented. @@ -141,9 +158,16 @@ cdef class Dataset: filter : Expression, default None Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the - partition information or internal metadata found in fragments, - e.g. Parquet statistics. Otherwise filters the loaded + partition information or internal metadata found in the data + source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them. + batch_size : int, default 32K + The maximum row count for scanned record batches. If scanned + record batches are overflowing memory then this method can be + called to reduce their size. + use_threads : bool, default True + If enabled, then maximum parallelism will be used determined by + the number of available CPU cores. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -152,84 +176,35 @@ cdef class Dataset: ------- scan_tasks : iterator of ScanTask """ - scanner = Scanner(self, columns=columns, filter=filter, - memory_pool=memory_pool) - return scanner.scan() + return self._scanner(**kwargs).scan() - def to_batches(self, columns=None, filter=None, - MemoryPool memory_pool=None): + def to_batches(self, **kwargs): """Read the dataset as materialized record batches. Builds a scan operation against the dataset and sequentially executes the ScanTasks as the returned generator gets consumed. - Parameters - ---------- - columns : list of str, default None - List of columns to project. Order and duplicates will be preserved. - The columns will be passed down to Datasets and corresponding data - fragments to avoid loading, copying, and deserializing columns - that will not be required further down the compute chain. - By default all of the available columns are projected. Raises - an exception if any of the referenced column names does not exist - in the dataset's Schema. - filter : Expression, default None - Scan will return only the rows matching the filter. - If possible the predicate will be pushed down to exploit the - partition information or internal metadata found in the fragments, - e.g. Parquet statistics. Otherwise filters the loaded - RecordBatches before yielding them. - memory_pool : MemoryPool, default None - For memory allocations, if required. If not specified, uses the - default pool. + See scan method parameters documentation. Returns ------- record_batches : iterator of RecordBatch """ - scanner = Scanner(self, columns=columns, filter=filter, - memory_pool=memory_pool) - for task in scanner.scan(): - for batch in task.execute(): - yield batch + return self._scanner(**kwargs).to_batches() - def to_table(self, columns=None, filter=None, use_threads=True, - MemoryPool memory_pool=None): + def to_table(self, **kwargs): """Read the dataset to an arrow table. Note that this method reads all the selected data from the dataset into memory. - Parameters - ---------- - columns : list of str, default None - List of columns to project. Order and duplicates will be preserved. - The columns will be passed down to Datasets and corresponding data - fragments to avoid loading, copying, and deserializing columns - that will not be required further down the compute chain. - By default all of the available columns are projected. Raises - an exception if any of the referenced column names does not exist - in the dataset's Schema. - filter : Expression, default None - Scan will return only the rows matching the filter. - If possible the predicate will be pushed down to exploit the - partition information or internal metadata found in the fragments, - e.g. Parquet statistics. Otherwise filters the loaded - RecordBatches before yielding them. - use_threads : boolean, default True - If enabled, then maximum parallelism will be used determined by - the number of available CPU cores. - memory_pool : MemoryPool, default None - For memory allocations, if required. If not specified, uses the - default pool. + See scan method parameters documentation. Returns ------- table : Table instance """ - scanner = Scanner(self, columns=columns, filter=filter, - use_threads=use_threads, memory_pool=memory_pool) - return scanner.to_table() + return self._scanner(**kwargs).to_table() @property def schema(self): @@ -378,30 +353,6 @@ cdef shared_ptr[CExpression] _insert_implicit_casts(Expression filter, ) -cdef shared_ptr[CScanOptions] _make_scan_options( - Schema schema, Expression partition_expression, object columns=None, - Expression filter=None) except *: - cdef: - shared_ptr[CScanOptions] options - CExpression* c_partition_expression - - assert schema is not None - assert partition_expression is not None - - filter = Expression.wrap(_insert_implicit_casts(filter, schema)) - filter = filter.assume(partition_expression) - - empty_dataset = UnionDataset(schema, children=[]) - scanner = Scanner(empty_dataset, columns=columns, filter=filter) - options = scanner.unwrap().get().options() - - c_partition_expression = partition_expression.unwrap().get() - check_status(CSetPartitionKeysInProjector(deref(c_partition_expression), - &options.get().projector)) - - return options - - cdef class FileFormat: cdef: @@ -443,7 +394,6 @@ cdef class FileFormat: return pyarrow_wrap_schema(move(c_schema)) def make_fragment(self, str path not None, FileSystem filesystem not None, - Schema schema=None, columns=None, filter=None, Expression partition_expression=ScalarExpression(True)): """ Make a FileFragment of this FileFormat. The filter may not reference @@ -451,19 +401,11 @@ cdef class FileFormat: one will be inferred. """ cdef: - shared_ptr[CScanOptions] c_options shared_ptr[CFileFragment] c_fragment - if schema is None: - schema = self.inspect(path, filesystem) - - c_options = _make_scan_options(schema, partition_expression, - columns, filter) - c_fragment = GetResultValue( self.format.MakeFragment(CFileSource(tobytes(path), filesystem.unwrap().get()), - move(c_options), partition_expression.unwrap())) return Fragment.wrap( move(c_fragment)) @@ -508,9 +450,14 @@ cdef class Fragment: return self.wrapped @property - def schema(self): - """Return the schema of batches scanned from this Fragment.""" - return pyarrow_wrap_schema(self.fragment.schema()) + def physical_schema(self): + """Return the physical schema of this Fragment. This schema can be + different from the dataset read schema.""" + cdef: + shared_ptr[CSchema] c_schema + + c_schema = GetResultValue(self.fragment.ReadPhysicalSchema()) + return pyarrow_wrap_schema(c_schema) @property def partition_expression(self): @@ -519,30 +466,78 @@ cdef class Fragment: """ return Expression.wrap(self.fragment.partition_expression()) - def to_table(self, use_threads=True, MemoryPool memory_pool=None): - """Convert this Fragment into a Table. + def _scanner(self, **kwargs): + return Scanner.from_fragment(self, **kwargs) - Use this convenience utility with care. This will serially materialize - the Scan result in memory before creating the Table. + def scan(self, Schema schema=None, **kwargs): + """Builds a scan operation against the dataset. + + It produces a stream of ScanTasks which is meant to be a unit of work + to be dispatched. The tasks are not executed automatically, the user is + responsible to execute and dispatch the individual tasks, so custom + local task scheduling can be implemented. + + Parameters + ---------- + schema : Schema + Schema to use for scanning. This is used to unify a Fragment to + it's Dataset's schema. If not specified this will use the + Fragment's physical schema which might differ for each Fragment. + columns : list of str, default None + List of columns to project. Order and duplicates will be preserved. + The columns will be passed down to Datasets and corresponding data + fragments to avoid loading, copying, and deserializing columns + that will not be required further down the compute chain. + By default all of the available columns are projected. Raises + an exception if any of the referenced column names does not exist + in the dataset's Schema. + filter : Expression, default None + Scan will return only the rows matching the filter. + If possible the predicate will be pushed down to exploit the + partition information or internal metadata found in the data + source, e.g. Parquet statistics. Otherwise filters the loaded + RecordBatches before yielding them. + batch_size : int, default 32K + The maximum row count for scanned record batches. If scanned + record batches are overflowing memory then this method can be + called to reduce their size. + use_threads : bool, default True + If enabled, then maximum parallelism will be used determined by + the number of available CPU cores. + memory_pool : MemoryPool, default None + For memory allocations, if required. If not specified, uses the + default pool. Returns ------- - table : Table + scan_tasks : iterator of ScanTask """ - scanner = Scanner._from_fragment(self, use_threads, memory_pool) - return scanner.to_table() + return self._scanner(schema=schema, **kwargs).scan() - def scan(self, MemoryPool memory_pool=None): - """Returns a stream of ScanTasks + def to_batches(self, Schema schema=None, **kwargs): + """Read the fragment as materialized record batches. - The caller is responsible to dispatch/schedule said tasks. Tasks should - be safe to run in a concurrent fashion and outlive the iterator. + See scan method parameters documentation. Returns ------- - scan_tasks : iterator of ScanTask + record_batches : iterator of RecordBatch + """ + return self._scanner(schema=schema, **kwargs).to_batches() + + def to_table(self, Schema schema=None, **kwargs): + """Convert this Fragment into a Table. + + Use this convenience utility with care. This will serially materialize + the Scan result in memory before creating the Table. + + See scan method parameters documentation. + + Returns + ------- + table : Table """ - return Scanner._from_fragment(self, memory_pool).scan() + return self._scanner(schema=schema, **kwargs).to_table() cdef class FileFragment(Fragment): @@ -627,7 +622,8 @@ cdef class ParquetFileFragment(FileFragment): shared_ptr[CExpression] c_extra_filter shared_ptr[CFragment] c_fragment - c_extra_filter = _insert_implicit_casts(extra_filter, self.schema) + schema = self.physical_schema + c_extra_filter = _insert_implicit_casts(extra_filter, schema) c_format = self.file_fragment.format().get() c_iterator = move(GetResultValue(c_format.GetRowGroupFragments(deref( self.parquet_file_fragment), move(c_extra_filter)))) @@ -734,31 +730,23 @@ cdef class ParquetFileFormat(FileFormat): return ParquetFileFormat, (self.read_options,) def make_fragment(self, str path not None, FileSystem filesystem not None, - Schema schema=None, columns=None, filter=None, Expression partition_expression=ScalarExpression(True), row_groups=None): cdef: - shared_ptr[CScanOptions] c_options shared_ptr[CFileFragment] c_fragment vector[int] c_row_groups if row_groups is None: - return super().make_fragment(path, filesystem, schema, columns, - filter, partition_expression) + return super().make_fragment(path, filesystem, + partition_expression) + for row_group in set(row_groups): c_row_groups.push_back( row_group) - if schema is None: - schema = self.inspect(path, filesystem) - - c_options = _make_scan_options(schema, partition_expression, - columns, filter) - c_fragment = GetResultValue( self.parquet_format.MakeFragment(CFileSource(tobytes(path), filesystem.unwrap() .get()), - move(c_options), partition_expression.unwrap(), move(c_row_groups))) return Fragment.wrap( move(c_fragment)) @@ -1321,6 +1309,35 @@ cdef class ScanTask: yield pyarrow_wrap_batch(record_batch) +cdef shared_ptr[CScanContext] _build_scan_context(bint use_threads=True, + MemoryPool memory_pool=None): + cdef: + shared_ptr[CScanContext] context + + context = make_shared[CScanContext]() + context.get().pool = maybe_unbox_memory_pool(memory_pool) + if use_threads is not None: + context.get().use_threads = use_threads + + return context + + +cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, + list columns=None, Expression filter=None, + int batch_size=32*2**10) except *: + cdef: + CScannerBuilder *builder + + builder = ptr.get() + if columns is not None: + check_status(builder.Project([tobytes(c) for c in columns])) + + check_status(builder.Filter(_insert_implicit_casts( + filter, pyarrow_wrap_schema(builder.schema())))) + + check_status(builder.BatchSize(batch_size)) + + cdef class Scanner: """A materialized scan operation with context and options bound. @@ -1345,13 +1362,13 @@ cdef class Scanner: partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them. - use_threads : bool, default True - If enabled, then maximum parallelism will be used determined by - the number of available CPU cores. batch_size : int, default 32K The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size. + use_threads : bool, default True + If enabled, then maximum parallelism will be used determined by + the number of available CPU cores. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -1361,39 +1378,8 @@ cdef class Scanner: shared_ptr[CScanner] wrapped CScanner* scanner - def __init__(self, Dataset dataset, list columns=None, - Expression filter=None, bint use_threads=True, - int batch_size=32*2**10, MemoryPool memory_pool=None): - cdef: - shared_ptr[CScanContext] context - shared_ptr[CScannerBuilder] builder - shared_ptr[CExpression] filter_expression - vector[c_string] columns_to_project - - # create scan context - context = make_shared[CScanContext]() - context.get().pool = maybe_unbox_memory_pool(memory_pool) - if use_threads is not None: - context.get().use_threads = use_threads - - # create scanner builder - builder = GetResultValue( - dataset.unwrap().get().NewScanWithContext(context) - ) - - # set the builder's properties - if columns is not None: - check_status(builder.get().Project([tobytes(c) for c in columns])) - - check_status(builder.get().Filter(_insert_implicit_casts( - filter, pyarrow_wrap_schema(builder.get().schema()) - ))) - - check_status(builder.get().BatchSize(batch_size)) - - # instantiate the scanner object - scanner = GetResultValue(builder.get().Finish()) - self.init(scanner) + def __init__(self): + _forbid_instantiation(self.__class__) cdef void init(self, const shared_ptr[CScanner]& sp): self.wrapped = sp @@ -1405,20 +1391,51 @@ cdef class Scanner: self.init(sp) return self + cdef inline shared_ptr[CScanner] unwrap(self): + return self.wrapped + @staticmethod - def _from_fragment(Fragment fragment not None, bint use_threads=True, - MemoryPool memory_pool=None): + def from_dataset(Dataset dataset not None, + bint use_threads=True, MemoryPool memory_pool=None, + list columns=None, Expression filter=None, + int batch_size=32*2**10): cdef: shared_ptr[CScanContext] context + shared_ptr[CScannerBuilder] builder + shared_ptr[CScanner] scanner - context = make_shared[CScanContext]() - context.get().pool = maybe_unbox_memory_pool(memory_pool) - context.get().use_threads = use_threads + context = _build_scan_context(use_threads=use_threads, + memory_pool=memory_pool) + builder = make_shared[CScannerBuilder](dataset.unwrap(), context) + _populate_builder(builder, columns=columns, filter=filter, + batch_size=batch_size) - return Scanner.wrap(make_shared[CScanner](fragment.unwrap(), context)) + scanner = GetResultValue(builder.get().Finish()) + return Scanner.wrap(scanner) - cdef inline shared_ptr[CScanner] unwrap(self): - return self.wrapped + @staticmethod + def from_fragment(Fragment fragment not None, Schema schema=None, + bint use_threads=True, MemoryPool memory_pool=None, + list columns=None, Expression filter=None, + int batch_size=32*2**10): + cdef: + shared_ptr[CScanContext] context + shared_ptr[CScannerBuilder] builder + shared_ptr[CScanner] scanner + + context = _build_scan_context(use_threads=use_threads, + memory_pool=memory_pool) + + if schema is None: + schema = fragment.physical_schema + + builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema), + fragment.unwrap(), context) + _populate_builder(builder, columns=columns, filter=filter, + batch_size=batch_size) + + scanner = GetResultValue(builder.get().Finish()) + return Scanner.wrap(scanner) def scan(self): """Returns a stream of ScanTasks @@ -1443,6 +1460,20 @@ cdef class Scanner: else: yield ScanTask.wrap(task) + def to_batches(self): + """Consume a Scanner in record batches. + + Sequentially executes the ScanTasks as the returned generator gets + consumed. + + Returns + ------- + record_batches : iterator of RecordBatch + """ + for task in self.scan(): + for batch in task.execute(): + yield batch + def to_table(self): """Convert a Scanner into a Table. diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index cd96d40d466..bc2ef32659c 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -137,6 +137,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CScanOptions "arrow::dataset::ScanOptions": CRecordBatchProjector projector + @staticmethod + shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) cdef cppclass CScanContext "arrow::dataset::ScanContext": c_bool use_threads @@ -149,8 +151,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[CRecordBatchIterator] Execute() cdef cppclass CFragment "arrow::dataset::Fragment": - CResult[CScanTaskIterator] Scan(shared_ptr[CScanContext] context) - const shared_ptr[CSchema]& schema() const + CResult[shared_ptr[CSchema]] ReadPhysicalSchema() + CResult[CScanTaskIterator] Scan( + shared_ptr[CScanOptions] options, shared_ptr[CScanContext] context) c_bool splittable() const c_string type_name() const const shared_ptr[CExpression]& partition_expression() const @@ -162,7 +165,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::FragmentIterator" cdef cppclass CScanner "arrow::dataset::Scanner": - CScanner(shared_ptr[CFragment], shared_ptr[CScanContext]) + CScanner(shared_ptr[CDataset], shared_ptr[CScanOptions], + shared_ptr[CScanContext]) + CScanner(shared_ptr[CFragment], shared_ptr[CScanOptions], + shared_ptr[CScanContext]) CResult[CScanTaskIterator] Scan() CResult[shared_ptr[CTable]] ToTable() CFragmentIterator GetFragments() @@ -171,6 +177,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CScannerBuilder "arrow::dataset::ScannerBuilder": CScannerBuilder(shared_ptr[CDataset], shared_ptr[CScanContext] scan_context) + CScannerBuilder(shared_ptr[CSchema], shared_ptr[CFragment], + shared_ptr[CScanContext] scan_context) CStatus Project(const vector[c_string]& columns) CStatus Filter(const CExpression& filter) CStatus Filter(shared_ptr[CExpression] filter) @@ -184,6 +192,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CDataset "arrow::dataset::Dataset": const shared_ptr[CSchema] & schema() + CFragmentIterator GetFragments() + CFragmentIterator GetFragments(shared_ptr[CExpression] predicate) const shared_ptr[CExpression] & partition_expression() c_string type_name() @@ -232,7 +242,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const CResult[shared_ptr[CFileFragment]] MakeFragment( CFileSource source, - shared_ptr[CScanOptions] options, shared_ptr[CExpression] partition_expression) cdef cppclass CFileFragment "arrow::dataset::FileFragment"( @@ -272,7 +281,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CExpression] extra_filter) CResult[shared_ptr[CFileFragment]] MakeFragment( CFileSource source, - shared_ptr[CScanOptions] options, shared_ptr[CExpression] partition_expression, vector[int] row_groups) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 8d0a21e02eb..951de2adfb6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -270,11 +270,8 @@ def test_filesystem_dataset(mockfs): assert row_group_fragments[0].path == path assert row_group_fragments[0].row_groups == {0} - # test predicate pushdown using row group metadata fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) assert len(fragments) == 2 - assert len(list(fragments[0].get_row_group_fragments())) == 1 - assert len(list(fragments[1].get_row_group_fragments())) == 0 def test_dataset(dataset): @@ -298,8 +295,7 @@ def test_dataset(dataset): assert len(table) == 10 condition = ds.field('i64') == 1 - scanner = ds.Scanner(dataset, use_threads=True, filter=condition) - result = scanner.to_table().to_pydict() + result = dataset.to_table(use_threads=True, filter=condition).to_pydict() # don't rely on the scanning order assert result['i64'] == [1, 1] @@ -309,15 +305,16 @@ def test_dataset(dataset): def test_scanner(dataset): - scanner = ds.Scanner(dataset, memory_pool=pa.default_memory_pool()) + scanner = ds.Scanner.from_dataset(dataset, + memory_pool=pa.default_memory_pool()) assert isinstance(scanner, ds.Scanner) assert len(list(scanner.scan())) == 2 with pytest.raises(pa.ArrowInvalid): - dataset.scan(columns=['unknown']) + ds.Scanner.from_dataset(dataset, columns=['unknown']) - scanner = ds.Scanner(dataset, columns=['i64'], - memory_pool=pa.default_memory_pool()) + scanner = ds.Scanner.from_dataset(dataset, columns=['i64'], + memory_pool=pa.default_memory_pool()) assert isinstance(scanner, ds.Scanner) assert len(list(scanner.scan())) == 2 @@ -601,7 +598,7 @@ def test_filesystem_factory(mockfs, paths_or_selector): assert isinstance(dataset, ds.FileSystemDataset) assert len(list(dataset.scan())) == 2 - scanner = ds.Scanner(dataset) + scanner = ds.Scanner.from_dataset(dataset) expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) expected_str = pa.DictionaryArray.from_arrays( @@ -651,13 +648,14 @@ def _create_dataset_for_fragments(tempdir, chunk_size=None): [range(8), [1] * 8, ['a'] * 4 + ['b'] * 4], names=['f1', 'f2', 'part'] ) + + path = str(tempdir / "test_parquet_dataset") + # write_to_dataset currently requires pandas - pq.write_to_dataset(table, str(tempdir / "test_parquet_dataset"), + pq.write_to_dataset(table, path, partition_cols=["part"], chunk_size=chunk_size) - dataset = ds.dataset(str(tempdir / "test_parquet_dataset/"), - format="parquet", partitioning="hive") - return table, dataset + return table, ds.dataset(path, format="parquet", partitioning="hive") @pytest.mark.pandas @@ -670,45 +668,41 @@ def test_fragments(tempdir): assert len(fragments) == 2 f = fragments[0] + physical_names = ['f1', 'f2'] # file's schema does not include partition column - phys_schema = f.schema.remove(f.schema.get_field_index('part')) - assert f.format.inspect(f.path, f.filesystem) == phys_schema + assert f.physical_schema.names == physical_names + assert f.format.inspect(f.path, f.filesystem) == f.physical_schema assert f.partition_expression.equals(ds.field('part') == 'a') - # scanning fragment includes partition columns + # By default, the partition column is not part of the schema. result = f.to_table() - assert f.schema == result.schema + assert result.column_names == physical_names + assert result.equals(table.remove_column(2).slice(0, 4)) + + # scanning fragment includes partition columns when given the proper + # schema. + result = f.to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] - assert len(result) == 4 assert result.equals(table.slice(0, 4)) - - # scanning fragments follow column projection - fragments = list(dataset.get_fragments(columns=['f1', 'part'])) - assert len(fragments) == 2 - result = fragments[0].to_table() - assert result.column_names == ['f1', 'part'] - assert len(result) == 4 + assert f.physical_schema == result.schema.remove(2) # scanning fragments follow filter predicate - fragments = list(dataset.get_fragments(filter=ds.field('f1') < 2)) - assert len(fragments) == 2 - result = fragments[0].to_table() + result = f.to_table(schema=dataset.schema, filter=ds.field('f1') < 2) assert result.column_names == ['f1', 'f2', 'part'] - assert len(result) == 2 - result = fragments[1].to_table() - assert len(result) == 0 +@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_reconstruct(tempdir): table, dataset = _create_dataset_for_fragments(tempdir) - def assert_yields_projected(fragment, row_slice, columns): - actual = fragment.to_table() - assert actual.column_names == columns + def assert_yields_projected(fragment, row_slice, schema): + actual = fragment.to_table(schema=schema) + assert actual.schema == schema.schema - expected = table.slice(*row_slice).to_pandas()[[*columns]] + names = schema.names + expected = table.slice(*row_slice).to_pandas()[[*names]] assert actual.equals(pa.Table.from_pandas(expected)) fragment = list(dataset.get_fragments())[0] @@ -716,7 +710,7 @@ def assert_yields_projected(fragment, row_slice, columns): # manually re-construct a fragment, with explicit schema new_fragment = parquet_format.make_fragment( - fragment.path, fragment.filesystem, schema=dataset.schema, + fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression) assert new_fragment.to_table().equals(fragment.to_table()) assert_yields_projected(new_fragment, (0, 4), table.column_names) @@ -750,6 +744,7 @@ def assert_yields_projected(fragment, row_slice, columns): partition_expression=fragment.partition_expression) +@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups(tempdir): @@ -760,21 +755,19 @@ def test_fragments_parquet_row_groups(tempdir): # list and scan row group fragments row_group_fragments = list(fragment.get_row_group_fragments()) assert len(row_group_fragments) == 2 - result = row_group_fragments[0].to_table() + result = row_group_fragments[0].to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] assert len(result) == 2 assert result.equals(table.slice(0, 2)) - # scanning row group fragment follows column projection / filter predicate - fragment = list(dataset.get_fragments( - columns=['part', 'f1'], filter=ds.field('f1') < 1))[0] + fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0] row_group_fragments = list(fragment.get_row_group_fragments()) assert len(row_group_fragments) == 1 - result = row_group_fragments[0].to_table() - assert result.column_names == ['part', 'f1'] + result = row_group_fragments[0].to_table(filter=ds.field('f1') < 1) assert len(result) == 1 +@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups_reconstruct(tempdir): @@ -786,7 +779,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir): # manually re-construct row group fragments new_fragment = parquet_format.make_fragment( - fragment.path, fragment.filesystem, schema=dataset.schema, + fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression, row_groups=[0]) result = new_fragment.to_table() @@ -794,11 +787,11 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir): # manually re-construct a row group fragment with filter/column projection new_fragment = parquet_format.make_fragment( - fragment.path, fragment.filesystem, schema=dataset.schema, - columns=['f1', 'part'], filter=ds.field('f1') < 3, + fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression, row_groups={1}) - result = new_fragment.to_table() + result = new_fragment.to_table(columns=['f1', 'part'], + filter=ds.field('f1') < 3, ) assert result.column_names == ['f1', 'part'] assert len(result) == 1 @@ -1272,9 +1265,7 @@ def test_filter_implicit_cast(tempdir): dataset = ds.dataset(str(path)) filter_ = ds.field('a') > 2 - scanner = ds.Scanner(dataset, filter=filter_) - result = scanner.to_table() - assert len(result) == 3 + assert len(dataset.to_table(filter=filter_)) == 3 def test_dataset_union(multisourcefs):