From 4117d29598b2c665f3d636e7aa6c7d79caa67d3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Fri, 10 Apr 2020 07:46:33 -0400 Subject: [PATCH 1/4] Move ScanOptions at Fragment::Scan instead of a property --- cpp/src/arrow/dataset/dataset.cc | 29 ++-- cpp/src/arrow/dataset/dataset.h | 32 ++-- cpp/src/arrow/dataset/dataset_test.cc | 3 +- cpp/src/arrow/dataset/file_base.cc | 52 +++--- cpp/src/arrow/dataset/file_base.h | 16 +- cpp/src/arrow/dataset/file_ipc.cc | 70 ++++---- cpp/src/arrow/dataset/file_ipc.h | 1 + cpp/src/arrow/dataset/file_ipc_test.cc | 19 ++- cpp/src/arrow/dataset/file_parquet.cc | 34 ++-- cpp/src/arrow/dataset/file_parquet.h | 13 +- cpp/src/arrow/dataset/file_parquet_test.cc | 36 ++-- cpp/src/arrow/dataset/partition.cc | 52 +++--- cpp/src/arrow/dataset/partition.h | 8 +- cpp/src/arrow/dataset/partition_test.cc | 18 +- cpp/src/arrow/dataset/scanner.cc | 9 +- cpp/src/arrow/dataset/scanner.h | 5 +- cpp/src/arrow/dataset/scanner_internal.h | 44 ++++- cpp/src/arrow/dataset/test_util.h | 2 +- python/pyarrow/_dataset.pyx | 163 ++++++++++--------- python/pyarrow/includes/libarrow_dataset.pxd | 12 +- python/pyarrow/tests/test_dataset.py | 18 +- 21 files changed, 321 insertions(+), 315 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 37d0da0bbc5..f4dca0b1c26 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -30,34 +30,33 @@ namespace arrow { namespace dataset { -Fragment::Fragment(std::shared_ptr scan_options) - : scan_options_(std::move(scan_options)), partition_expression_(scalar(true)) {} - -const std::shared_ptr& Fragment::schema() const { - return scan_options_->schema(); +Fragment::Fragment(std::shared_ptr physical_schema, + std::shared_ptr partition_expression) + : physical_schema_(std::move(physical_schema)), + partition_expression_(partition_expression ? partition_expression : scalar(true)) { + /// TODO(ARROW-8065) + if (physical_schema_ == nullptr) { + physical_schema_ = schema({}); + } } InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options) - : Fragment(std::move(scan_options)), record_batches_(std::move(record_batches)) {} - -InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options, std::shared_ptr partition_expression) - : Fragment(std::move(scan_options), std::move(partition_expression)), + : Fragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(), + std::move(partition_expression)), record_batches_(std::move(record_batches)) {} -Result InMemoryFragment::Scan(std::shared_ptr context) { +Result InMemoryFragment::Scan(std::shared_ptr options, + std::shared_ptr context) { // Make an explicit copy of record_batches_ to ensure Scan can be called // multiple times. auto batches_it = MakeVectorIterator(record_batches_); // RecordBatch -> ScanTask - auto scan_options = scan_options_; auto fn = [=](std::shared_ptr batch) -> std::shared_ptr { RecordBatchVector batches{batch}; return ::arrow::internal::make_unique( - std::move(batches), std::move(scan_options), std::move(context)); + std::move(batches), std::move(options), std::move(context)); }; return MakeMapIterator(fn, std::move(batches_it)); @@ -162,7 +161,7 @@ FragmentIterator InMemoryDataset::GetFragmentsImpl( batches.push_back(batch->Slice(batch_size * i, batch_size)); } - return std::make_shared(std::move(batches), scan_options); + return std::make_shared(std::move(batches)); }; return MakeMaybeMapIterator(std::move(create_fragment), get_batches_->Get()); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 740263df3f8..758004e33ba 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -46,20 +46,15 @@ class ARROW_DS_EXPORT Fragment { /// columns may be absent if they were not present in this fragment. /// /// To receive a record batch stream which is fully filtered and projected, use Scanner. - virtual Result Scan(std::shared_ptr context) = 0; + virtual Result Scan(std::shared_ptr options, + std::shared_ptr context) = 0; /// \brief Return true if the fragment can benefit from parallel scanning. virtual bool splittable() const = 0; virtual std::string type_name() const = 0; - /// \brief Filtering, schema reconciliation, and partition options to use when - /// scanning this fragment. - const std::shared_ptr& scan_options() const { return scan_options_; } - - const std::shared_ptr& schema() const; - - virtual ~Fragment() = default; + const std::shared_ptr& physical_schema() const { return physical_schema_; } /// \brief An expression which evaluates to true for all data viewed by this /// Fragment. @@ -67,15 +62,13 @@ class ARROW_DS_EXPORT Fragment { return partition_expression_; } - protected: - explicit Fragment(std::shared_ptr scan_options); + virtual ~Fragment() = default; - Fragment(std::shared_ptr scan_options, - std::shared_ptr partition_expression) - : scan_options_(std::move(scan_options)), - partition_expression_(std::move(partition_expression)) {} + protected: + Fragment(std::shared_ptr physical_schema = NULLPTR, + std::shared_ptr partition_expression = NULLPTR); - std::shared_ptr scan_options_; + std::shared_ptr physical_schema_; std::shared_ptr partition_expression_; }; @@ -84,13 +77,10 @@ class ARROW_DS_EXPORT Fragment { class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options); - - InMemoryFragment(RecordBatchVector record_batches, - std::shared_ptr scan_options, - std::shared_ptr partition_expression); + std::shared_ptr = NULLPTR); - Result Scan(std::shared_ptr context) override; + Result Scan(std::shared_ptr options, + std::shared_ptr context) override; bool splittable() const override { return false; } diff --git a/cpp/src/arrow/dataset/dataset_test.cc b/cpp/src/arrow/dataset/dataset_test.cc index bcaacca6b17..4be6e33e2c0 100644 --- a/cpp/src/arrow/dataset/dataset_test.cc +++ b/cpp/src/arrow/dataset/dataset_test.cc @@ -44,8 +44,7 @@ TEST_F(TestInMemoryFragment, Scan) { auto reader = ConstantArrayGenerator::Repeat(kNumberBatches, batch); // Creates a InMemoryFragment of the same repeated batch. - auto fragment = - InMemoryFragment({static_cast(kNumberBatches), batch}, options_); + auto fragment = InMemoryFragment({static_cast(kNumberBatches), batch}); AssertFragmentEquals(reader.get(), &fragment); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index fee471d975f..d9eedea1f14 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -55,26 +55,26 @@ Result> FileSource::OpenWritable() cons return std::make_shared<::arrow::io::BufferOutputStream>(b); } -Result> FileFormat::MakeFragment( - FileSource source, std::shared_ptr options) { - return MakeFragment(std::move(source), std::move(options), scalar(true)); +Result> FileFormat::MakeFragment(FileSource source) { + return MakeFragment(std::move(source), scalar(true)); } Result> FileFormat::MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression) { + FileSource source, std::shared_ptr partition_expression) { return std::shared_ptr(new FileFragment( - std::move(source), shared_from_this(), options, std::move(partition_expression))); + std::move(source), shared_from_this(), std::move(partition_expression))); } Result> FileFormat::WriteFragment( FileSource destination, std::shared_ptr fragment, + std::shared_ptr scan_options, std::shared_ptr scan_context) { return Status::NotImplemented("writing fragment of format ", type_name()); } -Result FileFragment::Scan(std::shared_ptr context) { - return format_->ScanFile(source_, scan_options_, std::move(context)); +Result FileFragment::Scan(std::shared_ptr options, + std::shared_ptr context) { + return format_->ScanFile(source_, std::move(options), std::move(context)); } FileSystemDataset::FileSystemDataset(std::shared_ptr schema, @@ -167,11 +167,9 @@ std::shared_ptr FoldingAnd(const std::shared_ptr& l, } FragmentIterator FileSystemDataset::GetFragmentsImpl( - std::shared_ptr root_options) { + std::shared_ptr scan_options) { FragmentVector fragments; - std::vector> options(forest_.size()); - ExpressionVector fragment_partitions(forest_.size()); auto collect_fragments = [&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune { @@ -180,33 +178,24 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl( // if available, copy parent's filter and projector // (which are appropriately simplified and loaded with default values) if (auto parent = ref.parent()) { - options[ref.i].reset(new ScanOptions(*options[parent.i])); - fragment_partitions[ref.i] = - FoldingAnd(fragment_partitions[parent.i], partitions_[ref.i]); + fragment_partitions[ref.i] = FoldingAnd(fragment_partitions[parent.i], partition); } else { - options[ref.i].reset(new ScanOptions(*root_options)); - fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partitions_[ref.i]); + fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partition); } // simplify filter by partition information - auto filter = options[ref.i]->filter->Assume(partition); - options[ref.i]->filter = filter; - + auto filter = scan_options->filter->Assume(partition); if (filter->IsNull() || filter->Equals(false)) { // directories (and descendants) which can't satisfy the filter are pruned return fs::PathForest::Prune; } - // if possible, extract a partition key and pass it to the projector - RETURN_NOT_OK(KeyValuePartitioning::SetDefaultValuesFromKeys( - *partition, &options[ref.i]->projector)); - if (ref.info().IsFile()) { // generate a fragment for this file FileSource src(ref.info().path(), filesystem_.get()); - ARROW_ASSIGN_OR_RAISE(auto fragment, - format_->MakeFragment(std::move(src), options[ref.i], - std::move(fragment_partitions[ref.i]))); + ARROW_ASSIGN_OR_RAISE( + auto fragment, + format_->MakeFragment(std::move(src), std::move(fragment_partitions[ref.i]))); fragments.push_back(std::move(fragment)); } @@ -222,9 +211,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl( } Result> FileSystemDataset::Write( - const WritePlan& plan, std::shared_ptr scan_context) { - std::vector> options(plan.paths.size()); - + const WritePlan& plan, std::shared_ptr scan_options, + std::shared_ptr scan_context) { auto filesystem = plan.filesystem; if (filesystem == nullptr) { filesystem = std::make_shared(); @@ -251,9 +239,9 @@ Result> FileSystemDataset::Write( const auto& fragment = util::get>(op); FileSource dest(files[i].path(), filesystem.get()); - ARROW_ASSIGN_OR_RAISE( - auto write_task, - plan.format->WriteFragment(std::move(dest), fragment, scan_context)); + ARROW_ASSIGN_OR_RAISE(auto write_task, + plan.format->WriteFragment(std::move(dest), fragment, + scan_options, scan_context)); task_group->Append([write_task] { return write_task->Execute(); }); } diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index e6d893193ff..d19fa945baa 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -144,23 +144,23 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression); + FileSource source, std::shared_ptr partition_expression); - Result> MakeFragment( - FileSource source, std::shared_ptr options); + Result> MakeFragment(FileSource source); /// \brief Write a fragment. If the parent directory of destination does not exist, it /// will be created. virtual Result> WriteFragment( FileSource destination, std::shared_ptr fragment, + std::shared_ptr options, std::shared_ptr scan_context); // FIXME(bkietz) make this pure virtual }; /// \brief A Fragment that is stored in a file with a known format class ARROW_DS_EXPORT FileFragment : public Fragment { public: - Result Scan(std::shared_ptr context) override; + Result Scan(std::shared_ptr options, + std::shared_ptr context) override; std::string type_name() const override { return format_->type_name(); } bool splittable() const override { return format_->splittable(); } @@ -170,9 +170,8 @@ class ARROW_DS_EXPORT FileFragment : public Fragment { protected: FileFragment(FileSource source, std::shared_ptr format, - std::shared_ptr scan_options, std::shared_ptr partition_expression) - : Fragment(std::move(scan_options), std::move(partition_expression)), + : Fragment(NULLPTR, std::move(partition_expression)), source_(std::move(source)), format_(std::move(format)) {} @@ -240,7 +239,8 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { /// \param[in] plan the WritePlan to execute. /// \param[in] scan_context context in which to scan fragments before writing. static Result> Write( - const WritePlan& plan, std::shared_ptr scan_context); + const WritePlan& plan, std::shared_ptr scan_options, + std::shared_ptr scan_context); std::string type_name() const override { return "filesystem"; } diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index 5d1b531a0a9..abd61f960d4 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -158,44 +158,56 @@ Result IpcFileFormat::ScanFile( return IpcScanTaskIterator::Make(options, context, source); } -Result> IpcFileFormat::WriteFragment( - FileSource destination, std::shared_ptr fragment, - std::shared_ptr scan_context) { - struct Task : WriteTask { - Task(FileSource destination, std::shared_ptr format, - std::shared_ptr fragment, std::shared_ptr scan_context) - : WriteTask(std::move(destination), std::move(format)), - fragment_(std::move(fragment)), - scan_context_(std::move(scan_context)) {} +namespace internal { +class IpcWriteTask : public WriteTask { + public: + IpcWriteTask(FileSource destination, std::shared_ptr format, + std::shared_ptr fragment, + std::shared_ptr scan_options, + std::shared_ptr scan_context) + : WriteTask(std::move(destination), std::move(format)), + fragment_(std::move(fragment)), + scan_options_(std::move(scan_options)), + scan_context_(std::move(scan_context)) {} - Status Execute() override { - RETURN_NOT_OK(CreateDestinationParentDir()); + Status Execute() override { + RETURN_NOT_OK(CreateDestinationParentDir()); - ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable()); - ARROW_ASSIGN_OR_RAISE(auto writer, - ipc::NewFileWriter(out_stream.get(), fragment_->schema())); - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment_->Scan(scan_context_)); + auto schema = scan_options_->schema(); - for (auto maybe_scan_task : scan_task_it) { - ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); + ARROW_ASSIGN_OR_RAISE(auto out_stream, destination_.OpenWritable()); + ARROW_ASSIGN_OR_RAISE(auto writer, ipc::NewFileWriter(out_stream.get(), schema)); + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, + fragment_->Scan(scan_options_, scan_context_)); - ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); + for (auto maybe_scan_task : scan_task_it) { + ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); - for (auto maybe_batch : batch_it) { - ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); - RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); - } - } + ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); - return writer->Close(); + for (auto maybe_batch : batch_it) { + ARROW_ASSIGN_OR_RAISE(auto batch, std::move(maybe_batch)); + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + } } - std::shared_ptr fragment_; - std::shared_ptr scan_context_; - }; + return writer->Close(); + } + + private: + std::shared_ptr fragment_; + std::shared_ptr scan_options_; + std::shared_ptr scan_context_; +}; +} // namespace internal - return std::make_shared(std::move(destination), shared_from_this(), - std::move(fragment), std::move(scan_context)); +Result> IpcFileFormat::WriteFragment( + FileSource destination, std::shared_ptr fragment, + std::shared_ptr scan_options, + std::shared_ptr scan_context) { + return std::make_shared( + std::move(destination), shared_from_this(), std::move(fragment), + std::move(scan_options), std::move(scan_context)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index 57c98f54d24..6eef2ac8ed2 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -47,6 +47,7 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { Result> WriteFragment( FileSource destination, std::shared_ptr fragment, + std::shared_ptr options, std::shared_ptr context) override; }; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 8b1d88491d7..7d04126e0fa 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -99,7 +99,7 @@ class TestIpcFileFormat : public ArrowIpcWriterMixin { } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); return Batches(std::move(scan_task_it)); } @@ -115,7 +115,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -132,11 +132,12 @@ TEST_F(TestIpcFileFormat, WriteRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); EXPECT_OK_AND_ASSIGN(auto sink, GetFileSink()); - EXPECT_OK_AND_ASSIGN(auto write_task, format_->WriteFragment(sink, fragment, ctx_)); + EXPECT_OK_AND_ASSIGN(auto write_task, + format_->WriteFragment(sink, fragment, opts_, ctx_)); ASSERT_OK(write_task->Execute()); @@ -186,14 +187,14 @@ TEST_F(TestIpcFileSystemDataset, Write) { opts_ = ScanOptions::Make(schema); auto partitioning_factory = DirectoryPartitioning::MakeFactory({"str", "i32"}); - ASSERT_OK_AND_ASSIGN( - auto plan, partitioning_factory->MakeWritePlan(dataset_->GetFragments(options_))); + ASSERT_OK_AND_ASSIGN(auto plan, partitioning_factory->MakeWritePlan( + schema, dataset_->GetFragments(options_))); plan.format = format_; plan.filesystem = fs_; plan.partition_base_dir = "new_root/"; - ASSERT_OK_AND_ASSIGN(auto written, FileSystemDataset::Write(plan, ctx_)); + ASSERT_OK_AND_ASSIGN(auto written, FileSystemDataset::Write(plan, opts_, ctx_)); using E = TestExpression; std::vector actual_partitions; @@ -249,7 +250,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjected) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -282,7 +283,7 @@ TEST_F(TestIpcFileFormat, ScanRecordBatchReaderProjectedMissingCols) { auto readers = {reader.get(), reader_without_i32.get(), reader_without_f64.get()}; for (auto reader : readers) { auto source = GetFileSource(reader); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); // NB: projector is applied by the scanner; Fragment does not evaluate it. // We will not drop "i32" even though it is not in the projector's schema. diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 107f9c42f1e..7daf1400976 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -402,19 +402,17 @@ Result ParquetFileFormat::ScanFile( } Result> ParquetFileFormat::MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression, std::vector row_groups) { + FileSource source, std::shared_ptr partition_expression, + std::vector row_groups) { return std::shared_ptr( - new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options), + new ParquetFileFragment(std::move(source), shared_from_this(), std::move(partition_expression), std::move(row_groups))); } Result> ParquetFileFormat::MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression) { - return std::shared_ptr( - new ParquetFileFragment(std::move(source), shared_from_this(), std::move(options), - std::move(partition_expression), {})); + FileSource source, std::shared_ptr partition_expression) { + return std::shared_ptr(new ParquetFileFragment( + std::move(source), shared_from_this(), std::move(partition_expression), {})); } Result ParquetFileFormat::GetRowGroupFragments( @@ -433,26 +431,22 @@ Result ParquetFileFormat::GetRowGroupFragments( } FragmentVector fragments(row_groups.size()); - auto new_options = std::make_shared(*fragment.scan_options()); - if (!extra_filter->Equals(true)) { - new_options->filter = and_(std::move(extra_filter), std::move(new_options->filter)); - } - - RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), - new_options->filter, std::move(row_groups)); + RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), extra_filter, + std::move(row_groups)); for (int i = 0, row_group = skipper.Next(); row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) { - ARROW_ASSIGN_OR_RAISE(fragments[i++], - MakeFragment(fragment.source(), new_options, - fragment.partition_expression(), {row_group})); + ARROW_ASSIGN_OR_RAISE( + fragments[i++], + MakeFragment(fragment.source(), fragment.partition_expression(), {row_group})); } return MakeVectorIterator(std::move(fragments)); } -Result ParquetFileFragment::Scan(std::shared_ptr context) { - return parquet_format().ScanFile(source_, scan_options_, std::move(context), +Result ParquetFileFragment::Scan(std::shared_ptr options, + std::shared_ptr context) { + return parquet_format().ScanFile(source_, std::move(options), std::move(context), row_groups_); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index a1ab047c8d1..4ce0f8af297 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -100,13 +100,12 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { using FileFormat::MakeFragment; Result> MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression) override; + FileSource source, std::shared_ptr partition_expression) override; /// \brief Create a Fragment, restricted to the specified row groups. Result> MakeFragment( - FileSource source, std::shared_ptr options, - std::shared_ptr partition_expression, std::vector row_groups); + FileSource source, std::shared_ptr partition_expression, + std::vector row_groups); /// \brief Split a ParquetFileFragment into a Fragment for each row group. /// Row groups whose metadata contradicts the fragment's filter or the extra_filter @@ -118,7 +117,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { public: - Result Scan(std::shared_ptr context) override; + Result Scan(std::shared_ptr options, + std::shared_ptr context) override; /// \brief The row groups viewed by this Fragment. This may be empty which signifies all /// row groups are selected. @@ -126,10 +126,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { private: ParquetFileFragment(FileSource source, std::shared_ptr format, - std::shared_ptr scan_options, std::shared_ptr partition_expression, std::vector row_groups) - : FileFragment(std::move(source), std::move(format), std::move(scan_options), + : FileFragment(std::move(source), std::move(format), std::move(partition_expression)), row_groups_(std::move(row_groups)) {} diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 9a7c8fbfb45..0a1ec2acbe3 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -149,7 +149,7 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { } RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); return Batches(std::move(scan_task_it)); } @@ -181,14 +181,10 @@ class TestParquetFileFormat : public ArrowParquetWriterMixin { void CountRowGroupsInFragment(const std::shared_ptr& fragment, std::vector expected_row_groups, - const Expression& filter, - const Expression& extra_filter = *scalar(true)) { - fragment->scan_options()->filter = filter.Copy(); - + const Expression& filter) { auto parquet_fragment = checked_pointer_cast(fragment); - ASSERT_OK_AND_ASSIGN( - auto row_group_fragments, - format_->GetRowGroupFragments(*parquet_fragment, extra_filter.Copy())); + ASSERT_OK_AND_ASSIGN(auto row_group_fragments, + format_->GetRowGroupFragments(*parquet_fragment, filter.Copy())); auto expected_row_group = expected_row_groups.begin(); for (auto maybe_fragment : row_group_fragments) { @@ -214,7 +210,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -235,9 +231,9 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderDictEncoded) { opts_ = ScanOptions::Make(reader->schema()); format_->reader_options.dict_columns = {"utf8"}; - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(ctx_)); + ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_, ctx_)); int64_t row_count = 0; Schema expected_schema({field("utf8", dictionary(int32(), utf8()))}); @@ -283,7 +279,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjected) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); int64_t row_count = 0; @@ -316,7 +312,7 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReaderProjectedMissingCols) { auto readers = {reader.get(), reader_without_i32.get(), reader_without_f64.get()}; for (auto reader : readers) { auto source = GetFileSource(reader); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); // NB: projector is applied by the scanner; ParquetFragment does not evaluate it. // We will not drop "i32" even though it is not in the projector's schema. @@ -404,7 +400,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdown) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); opts_->filter = scalar(true); CountRowsAndBatchesInScan(fragment, kTotalNumRows, kNumRowGroups); @@ -444,7 +440,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { auto source = GetFileSource(reader.get()); opts_ = ScanOptions::Make(reader->schema()); - ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source, opts_)); + ASSERT_OK_AND_ASSIGN(auto fragment, format_->MakeFragment(*source)); CountRowGroupsInFragment(fragment, internal::Iota(static_cast(kTotalNumRows)), *scalar(true)); @@ -460,7 +456,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { // No rows match 1 and 2. CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(1) and "u8"_ == uint8_t(2)); - CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(2), "i64"_ == int64_t(4)); + CountRowGroupsInFragment(fragment, {}, "i64"_ == int64_t(2) and "i64"_ == int64_t(4)); CountRowGroupsInFragment(fragment, {1, 3}, "i64"_ == int64_t(2) or "i64"_ == int64_t(4)); @@ -470,8 +466,8 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { CountRowGroupsInFragment(fragment, internal::Iota(5, static_cast(kNumRowGroups)), "i64"_ >= int64_t(6)); - CountRowGroupsInFragment(fragment, {5, 6, 7}, "i64"_ >= int64_t(6), - "i64"_ < int64_t(8)); + CountRowGroupsInFragment(fragment, {5, 6, 7}, + "i64"_ >= int64_t(6) && "i64"_ < int64_t(8)); } TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { @@ -485,7 +481,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { auto row_groups_fragment = [&](std::vector row_groups) { EXPECT_OK_AND_ASSIGN(auto fragment, - format_->MakeFragment(*source, opts_, scalar(true), row_groups)); + format_->MakeFragment(*source, scalar(true), row_groups)); return internal::checked_pointer_cast(fragment); }; @@ -517,7 +513,7 @@ TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { EXPECT_RAISES_WITH_MESSAGE_THAT( IndexError, testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"), - row_groups_fragment({kNumRowGroups + 1})->Scan(ctx_)); + row_groups_fragment({kNumRowGroups + 1})->Scan(opts_, ctx_)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 676eb752aa4..4a08081a4e0 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -64,13 +64,15 @@ std::shared_ptr Partitioning::Default() { return std::make_shared(); } -Result PartitioningFactory::MakeWritePlan(FragmentIterator fragment_it) { +Result PartitioningFactory::MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragment_it) { return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ", type_name()); } Result PartitioningFactory::MakeWritePlan( - FragmentIterator fragment_it, const std::shared_ptr& schema) { + std::shared_ptr schema, FragmentIterator fragment_it, + std::shared_ptr partition_schema) { return Status::NotImplemented("MakeWritePlan from PartitioningFactory of type ", type_name()); } @@ -297,10 +299,12 @@ class DirectoryPartitioningFactory : public PartitioningFactory { struct MakeWritePlanImpl; - Result MakeWritePlan(FragmentIterator fragments) override; + Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments) override; - Result MakeWritePlan(FragmentIterator fragments, - const std::shared_ptr& schema) override; + Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments, + std::shared_ptr partition_schema) override; private: std::vector field_names_; @@ -309,9 +313,10 @@ class DirectoryPartitioningFactory : public PartitioningFactory { struct DirectoryPartitioningFactory::MakeWritePlanImpl { using Indices = std::basic_string; - MakeWritePlanImpl(DirectoryPartitioningFactory* factory, + MakeWritePlanImpl(DirectoryPartitioningFactory* factory, std::shared_ptr schema, FragmentVector source_fragments) : this_(factory), + schema_(std::move(schema)), source_fragments_(std::move(source_fragments)), right_hand_sides_(source_fragments_.size(), Indices(num_fields(), -1)) {} @@ -416,23 +421,6 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { return std::to_string(milliseconds_since_epoch); } - // remove fields which will be implicit in the partitioning; writing them to files would - // be redundant - Status DropPartitionFields(const std::shared_ptr& partitioning, - Fragment* fragment) { - auto schema = fragment->schema(); - for (const auto& field : partitioning->schema()->fields()) { - int field_i = schema->GetFieldIndex(field->name()); - if (field_i != -1) { - ARROW_ASSIGN_OR_RAISE(schema, schema->RemoveField(field_i)); - } - } - - // the fragment being scanned to disk will now deselect redundant columns - fragment->scan_options()->projector = RecordBatchProjector(std::move(schema)); - return Status::OK(); - } - Result Finish(std::shared_ptr partitioning_schema = nullptr) && { WritePlan out; @@ -444,10 +432,9 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { ARROW_ASSIGN_OR_RAISE(out.partitioning, this_->Finish(std::move(partitioning_schema))); - auto fragment_schema = - source_fragments_.empty() ? schema({}) : source_fragments_.front()->schema(); + // There's no guarantee that all Fragments have the same schema. ARROW_ASSIGN_OR_RAISE(out.schema, - UnifySchemas({out.partitioning->schema(), fragment_schema})); + UnifySchemas({out.partitioning->schema(), schema_})); // Lexicographic ordering WRT right_hand_sides_ ensures that source_fragments_ are in // a depth first visitation order WRT their partition expressions. This makes @@ -473,9 +460,6 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { Indices current_parents(num_fields() + 1, -1); for (size_t fragment_i = 0; fragment_i < source_fragments_.size(); ++fragment_i) { - RETURN_NOT_OK( - DropPartitionFields(out.partitioning, source_fragments_[fragment_i].get())); - int field_i = 0; for (; field_i < num_fields(); ++field_i) { // these directories have already been created and we're still writing their @@ -528,6 +512,7 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { } DirectoryPartitioningFactory* this_; + std::shared_ptr schema_; FragmentVector source_fragments_; struct { @@ -552,15 +537,16 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { }; Result DirectoryPartitioningFactory::MakeWritePlan( - FragmentIterator fragment_it, const std::shared_ptr& schema) { + std::shared_ptr schema, FragmentIterator fragment_it, + std::shared_ptr partition_schema) { ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector()); - return MakeWritePlanImpl(this, std::move(fragments)).Finish(schema); + return MakeWritePlanImpl(this, schema, std::move(fragments)).Finish(partition_schema); } Result DirectoryPartitioningFactory::MakeWritePlan( - FragmentIterator fragment_it) { + std::shared_ptr schema, FragmentIterator fragment_it) { ARROW_ASSIGN_OR_RAISE(auto fragments, fragment_it.ToVector()); - return MakeWritePlanImpl(this, std::move(fragments)).Finish(); + return MakeWritePlanImpl(this, schema, std::move(fragments)).Finish(); } std::shared_ptr DirectoryPartitioning::MakeFactory( diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 463d06cb40a..12edd75800c 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -104,10 +104,12 @@ class ARROW_DS_EXPORT PartitioningFactory { // FIXME(bkietz) Make these pure virtual /// Construct a WritePlan for the provided fragments - virtual Result MakeWritePlan(FragmentIterator fragments, - const std::shared_ptr& schema); + virtual Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments, + std::shared_ptr partition_schema); /// Construct a WritePlan for the provided fragments, inferring schema - virtual Result MakeWritePlan(FragmentIterator fragments); + virtual Result MakeWritePlan(std::shared_ptr schema, + FragmentIterator fragments); }; /// \brief Subclass for representing the default, a partitioning that returns diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index ec9e826afc2..a77c197d91c 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -304,8 +304,7 @@ class TestPartitioningWritePlan : public ::testing::Test { FragmentIterator MakeFragments(const ExpressionVector& partition_expressions) { fragments_.clear(); for (const auto& expr : partition_expressions) { - fragments_.emplace_back( - new InMemoryFragment(RecordBatchVector{}, scan_options_, expr)); + fragments_.emplace_back(new InMemoryFragment(RecordBatchVector{}, expr)); } return MakeVectorIterator(fragments_); } @@ -321,27 +320,30 @@ class TestPartitioningWritePlan : public ::testing::Test { template void MakeWritePlan(const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - EXPECT_OK_AND_ASSIGN(plan_, factory_->MakeWritePlan(std::move(fragments))); + EXPECT_OK_AND_ASSIGN(plan_, + factory_->MakeWritePlan(schema({}), std::move(fragments))); } template Status MakeWritePlanError(const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - return factory_->MakeWritePlan(std::move(fragments)).status(); + return factory_->MakeWritePlan(schema({}), std::move(fragments)).status(); } template - void MakeWritePlanWithSchema(const std::shared_ptr& schema, + void MakeWritePlanWithSchema(const std::shared_ptr& partition_schema, const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - EXPECT_OK_AND_ASSIGN(plan_, factory_->MakeWritePlan(std::move(fragments), schema)); + EXPECT_OK_AND_ASSIGN(plan_, factory_->MakeWritePlan(schema({}), std::move(fragments), + partition_schema)); } template - Status MakeWritePlanWithSchemaError(const std::shared_ptr& schema, + Status MakeWritePlanWithSchemaError(const std::shared_ptr& partition_schema, const E&... partition_expressions) { auto fragments = MakeFragments(partition_expressions...); - return factory_->MakeWritePlan(std::move(fragments), schema).status(); + return factory_->MakeWritePlan(schema({}), std::move(fragments), partition_schema) + .status(); } struct ExpectedWritePlan { diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index a78450ba219..199571d2eb1 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -80,14 +80,7 @@ Result Scanner::Scan() { // Transforms Iterator into a unified // Iterator. The first Iterator::Next invocation is going to do // all the work of unwinding the chained iterators. - auto scan_task_it = GetScanTaskIterator(GetFragments(), scan_context_); - - // Apply the filter and/or projection to incoming RecordBatches by - // wrapping the ScanTask with a FilterAndProjectScanTask - auto wrap_scan_task = [](std::shared_ptr task) -> std::shared_ptr { - return std::make_shared(std::move(task)); - }; - return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); + return GetScanTaskIterator(GetFragments(), scan_options_, scan_context_); } Result ScanTaskIteratorFromRecordBatch( diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index e94efd4874a..a41af106c44 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -158,9 +158,10 @@ class ARROW_DS_EXPORT Scanner { scan_options_(std::move(scan_options)), scan_context_(std::move(scan_context)) {} - Scanner(std::shared_ptr fragment, std::shared_ptr scan_context) + Scanner(std::shared_ptr fragment, std::shared_ptr scan_options, + std::shared_ptr scan_context) : fragment_(std::move(fragment)), - scan_options_(fragment_->scan_options()), + scan_options_(std::move(scan_options)), scan_context_(std::move(scan_context)) {} /// \brief The Scan operator returns a stream of ScanTask. The caller is diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h index 0cbd4a55d2a..0aad0d63216 100644 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ b/cpp/src/arrow/dataset/scanner_internal.h @@ -22,6 +22,7 @@ #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/filter.h" +#include "arrow/dataset/partition.h" #include "arrow/dataset/scanner.h" namespace arrow { @@ -55,28 +56,55 @@ inline RecordBatchIterator ProjectRecordBatch(RecordBatchIterator it, class FilterAndProjectScanTask : public ScanTask { public: - explicit FilterAndProjectScanTask(std::shared_ptr task) - : ScanTask(task->options(), task->context()), task_(std::move(task)) {} + explicit FilterAndProjectScanTask(std::shared_ptr task, + std::shared_ptr partition) + : ScanTask(task->options(), task->context()), + task_(std::move(task)), + partition_(std::move(partition)), + filter_(NULLPTR), + projector_(options()->projector) {} Result Execute() override { ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); - auto filter_it = FilterRecordBatch(std::move(it), *options_->evaluator, - *options_->filter, context_->pool); - return ProjectRecordBatch(std::move(filter_it), &task_->options()->projector, - context_->pool); + + filter_ = options()->filter->Assume(partition_); + auto filter_it = + FilterRecordBatch(std::move(it), *options_->evaluator, *filter_, context_->pool); + + if (partition_) { + RETURN_NOT_OK( + KeyValuePartitioning::SetDefaultValuesFromKeys(*partition_, &projector_)); + } + return ProjectRecordBatch(std::move(filter_it), &projector_, context_->pool); } private: std::shared_ptr task_; + std::shared_ptr partition_; + std::shared_ptr filter_; + RecordBatchProjector projector_; }; /// \brief GetScanTaskIterator transforms an Iterator in a /// flattened Iterator. inline ScanTaskIterator GetScanTaskIterator(FragmentIterator fragments, + std::shared_ptr options, std::shared_ptr context) { // Fragment -> ScanTaskIterator - auto fn = [context](std::shared_ptr fragment) { - return fragment->Scan(context); + auto fn = [options, + context](std::shared_ptr fragment) -> Result { + ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options, context)); + + auto partition = fragment->partition_expression(); + // Apply the filter and/or projection to incoming RecordBatches by + // wrapping the ScanTask with a FilterAndProjectScanTask + auto wrap_scan_task = + [partition](std::shared_ptr task) -> std::shared_ptr { + return std::make_shared(std::move(task), + std::move(partition)); + }; + + return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); }; // Iterator> diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index b7feaef2b46..1ae2decc3f7 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -119,7 +119,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(ctx_)); + ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_, ctx_)); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 510491319f0..dad6e85758c 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -120,10 +120,14 @@ cdef class Dataset: """ return Scanner(self, columns=columns, filter=filter).get_fragments() - def scan(self, columns=None, filter=None, MemoryPool memory_pool=None): + def _scanner(self, **kwargs): + return Scanner(self, **kwargs) + + def scan(self, columns=None, filter=None, + MemoryPool memory_pool=None, **kwargs): """Builds a scan operation against the dataset. - It poduces a stream of ScanTasks which is meant to be a unit of work to + It produces a stream of ScanTasks which is meant to be a unit of work to be dispatched. The tasks are not executed automatically, the user is responsible to execute and dispatch the individual tasks, so custom local task scheduling can be implemented. @@ -152,12 +156,10 @@ cdef class Dataset: ------- scan_tasks : iterator of ScanTask """ - scanner = Scanner(self, columns=columns, filter=filter, - memory_pool=memory_pool) - return scanner.scan() + return self._scanner(columns=columns, filter=filter, + memory_pool=memory_pool, **kwargs).scan() - def to_batches(self, columns=None, filter=None, - MemoryPool memory_pool=None): + def to_batches(self, **kwargs): """Read the dataset as materialized record batches. Builds a scan operation against the dataset and sequentially executes @@ -187,14 +189,9 @@ cdef class Dataset: ------- record_batches : iterator of RecordBatch """ - scanner = Scanner(self, columns=columns, filter=filter, - memory_pool=memory_pool) - for task in scanner.scan(): - for batch in task.execute(): - yield batch + return self._scanner(**kwargs).to_batches() - def to_table(self, columns=None, filter=None, use_threads=True, - MemoryPool memory_pool=None): + def to_table(self, **kwargs): """Read the dataset to an arrow table. Note that this method reads all the selected data from the dataset @@ -227,9 +224,7 @@ cdef class Dataset: ------- table : Table instance """ - scanner = Scanner(self, columns=columns, filter=filter, - use_threads=use_threads, memory_pool=memory_pool) - return scanner.to_table() + return self._scanner(**kwargs).to_table() @property def schema(self): @@ -378,30 +373,6 @@ cdef shared_ptr[CExpression] _insert_implicit_casts(Expression filter, ) -cdef shared_ptr[CScanOptions] _make_scan_options( - Schema schema, Expression partition_expression, object columns=None, - Expression filter=None) except *: - cdef: - shared_ptr[CScanOptions] options - CExpression* c_partition_expression - - assert schema is not None - assert partition_expression is not None - - filter = Expression.wrap(_insert_implicit_casts(filter, schema)) - filter = filter.assume(partition_expression) - - empty_dataset = UnionDataset(schema, children=[]) - scanner = Scanner(empty_dataset, columns=columns, filter=filter) - options = scanner.unwrap().get().options() - - c_partition_expression = partition_expression.unwrap().get() - check_status(CSetPartitionKeysInProjector(deref(c_partition_expression), - &options.get().projector)) - - return options - - cdef class FileFormat: cdef: @@ -443,7 +414,6 @@ cdef class FileFormat: return pyarrow_wrap_schema(move(c_schema)) def make_fragment(self, str path not None, FileSystem filesystem not None, - Schema schema=None, columns=None, filter=None, Expression partition_expression=ScalarExpression(True)): """ Make a FileFragment of this FileFormat. The filter may not reference @@ -451,19 +421,11 @@ cdef class FileFormat: one will be inferred. """ cdef: - shared_ptr[CScanOptions] c_options shared_ptr[CFileFragment] c_fragment - if schema is None: - schema = self.inspect(path, filesystem) - - c_options = _make_scan_options(schema, partition_expression, - columns, filter) - c_fragment = GetResultValue( self.format.MakeFragment(CFileSource(tobytes(path), filesystem.unwrap().get()), - move(c_options), partition_expression.unwrap())) return Fragment.wrap( move(c_fragment)) @@ -510,7 +472,7 @@ cdef class Fragment: @property def schema(self): """Return the schema of batches scanned from this Fragment.""" - return pyarrow_wrap_schema(self.fragment.schema()) + return pyarrow_wrap_schema(self.fragment.physical_schema()) @property def partition_expression(self): @@ -519,30 +481,69 @@ cdef class Fragment: """ return Expression.wrap(self.fragment.partition_expression()) - def to_table(self, use_threads=True, MemoryPool memory_pool=None): - """Convert this Fragment into a Table. + def _scanner(self, **kwargs): + return Scanner._from_fragment(self, **kwargs) - Use this convenience utility with care. This will serially materialize - the Scan result in memory before creating the Table. + def scan(self, columns=None, filter=None, use_threads=True, + MemoryPool memory_pool=None, **kwargs): + """Builds a scan operation against the dataset. + + It produces a stream of ScanTasks which is meant to be a unit of work to + be dispatched. The tasks are not executed automatically, the user is + responsible to execute and dispatch the individual tasks, so custom + local task scheduling can be implemented. + + Parameters + ---------- + columns : list of str, default None + List of columns to project. Order and duplicates will be preserved. + The columns will be passed down to Datasets and corresponding data + fragments to avoid loading, copying, and deserializing columns + that will not be required further down the compute chain. + By default all of the available columns are projected. Raises + an exception if any of the referenced column names does not exist + in the dataset's Schema. + filter : Expression, default None + Scan will return only the rows matching the filter. + If possible the predicate will be pushed down to exploit the + partition information or internal metadata found in fragments, + e.g. Parquet statistics. Otherwise filters the loaded + RecordBatches before yielding them. + memory_pool : MemoryPool, default None + For memory allocations, if required. If not specified, uses the + default pool. Returns ------- - table : Table + scan_tasks : iterator of ScanTask """ - scanner = Scanner._from_fragment(self, use_threads, memory_pool) - return scanner.to_table() + return self._scanner(columns=columns, filter=filter, + use_threads=use_threads,memory_pool=memory_pool, **kwargs).scan() - def scan(self, MemoryPool memory_pool=None): - """Returns a stream of ScanTasks + def to_batches(self, **kwargs): + """Read the fragment as materialized record batches. - The caller is responsible to dispatch/schedule said tasks. Tasks should - be safe to run in a concurrent fashion and outlive the iterator. + See scan() method arguments. Returns ------- - scan_tasks : iterator of ScanTask + record_batches : iterator of RecordBatch """ - return Scanner._from_fragment(self, memory_pool).scan() + return self._scanner(**kwargs).to_batches() + + def to_table(self, **kwargs): + """Convert this Fragment into a Table. + + Use this convenience utility with care. This will serially materialize + the Scan result in memory before creating the Table. + + See scan() method arguments. + + Returns + ------- + table : Table + """ + return self._scanner(**kwargs).to_table() cdef class FileFragment(Fragment): @@ -734,31 +735,23 @@ cdef class ParquetFileFormat(FileFormat): return ParquetFileFormat, (self.read_options,) def make_fragment(self, str path not None, FileSystem filesystem not None, - Schema schema=None, columns=None, filter=None, Expression partition_expression=ScalarExpression(True), row_groups=None): cdef: - shared_ptr[CScanOptions] c_options shared_ptr[CFileFragment] c_fragment vector[int] c_row_groups if row_groups is None: - return super().make_fragment(path, filesystem, schema, columns, - filter, partition_expression) - for row_group in set(row_groups): - c_row_groups.push_back( row_group) + return super().make_fragment(path, filesystem, partition_expression) - if schema is None: - schema = self.inspect(path, filesystem) - c_options = _make_scan_options(schema, partition_expression, - columns, filter) + for row_group in set(row_groups): + c_row_groups.push_back( row_group) c_fragment = GetResultValue( self.parquet_format.MakeFragment(CFileSource(tobytes(path), filesystem.unwrap() .get()), - move(c_options), partition_expression.unwrap(), move(c_row_groups))) return Fragment.wrap( move(c_fragment)) @@ -1409,13 +1402,17 @@ cdef class Scanner: def _from_fragment(Fragment fragment not None, bint use_threads=True, MemoryPool memory_pool=None): cdef: + shared_ptr[CScanOptions] options shared_ptr[CScanContext] context + # TODO(ARROW-8065) + options = CScanOptions.Make(pyarrow_unwrap_schema(fragment.schema)) + context = make_shared[CScanContext]() context.get().pool = maybe_unbox_memory_pool(memory_pool) context.get().use_threads = use_threads - return Scanner.wrap(make_shared[CScanner](fragment.unwrap(), context)) + return Scanner.wrap(make_shared[CScanner](fragment.unwrap(), options, context)) cdef inline shared_ptr[CScanner] unwrap(self): return self.wrapped @@ -1443,6 +1440,20 @@ cdef class Scanner: else: yield ScanTask.wrap(task) + def to_batches(self): + """Consume a Scanner in record batches. + + Sequentially executes the ScanTasks as the returned generator gets + consumed. + + Returns + ------- + record_batches : iterator of RecordBatch + """ + for task in self.scan(): + for batch in task.execute(): + yield batch + def to_table(self): """Convert a Scanner into a Table. diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index cd96d40d466..93f79ca89b2 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -137,6 +137,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CScanOptions "arrow::dataset::ScanOptions": CRecordBatchProjector projector + @staticmethod + shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) + cdef cppclass CScanContext "arrow::dataset::ScanContext": c_bool use_threads @@ -149,8 +152,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[CRecordBatchIterator] Execute() cdef cppclass CFragment "arrow::dataset::Fragment": - CResult[CScanTaskIterator] Scan(shared_ptr[CScanContext] context) - const shared_ptr[CSchema]& schema() const + CResult[CScanTaskIterator] Scan( + shared_ptr[CScanOptions] options, shared_ptr[CScanContext] context) + const shared_ptr[CSchema]& physical_schema() const c_bool splittable() const c_string type_name() const const shared_ptr[CExpression]& partition_expression() const @@ -162,7 +166,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::FragmentIterator" cdef cppclass CScanner "arrow::dataset::Scanner": - CScanner(shared_ptr[CFragment], shared_ptr[CScanContext]) + CScanner(shared_ptr[CFragment], shared_ptr[CScanOptions], shared_ptr[CScanContext]) CResult[CScanTaskIterator] Scan() CResult[shared_ptr[CTable]] ToTable() CFragmentIterator GetFragments() @@ -232,7 +236,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[shared_ptr[CSchema]] Inspect(const CFileSource&) const CResult[shared_ptr[CFileFragment]] MakeFragment( CFileSource source, - shared_ptr[CScanOptions] options, shared_ptr[CExpression] partition_expression) cdef cppclass CFileFragment "arrow::dataset::FileFragment"( @@ -272,7 +275,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: shared_ptr[CExpression] extra_filter) CResult[shared_ptr[CFileFragment]] MakeFragment( CFileSource source, - shared_ptr[CScanOptions] options, shared_ptr[CExpression] partition_expression, vector[int] row_groups) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 8d0a21e02eb..bfec7e256d4 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -271,10 +271,11 @@ def test_filesystem_dataset(mockfs): assert row_group_fragments[0].row_groups == {0} # test predicate pushdown using row group metadata - fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) - assert len(fragments) == 2 - assert len(list(fragments[0].get_row_group_fragments())) == 1 - assert len(list(fragments[1].get_row_group_fragments())) == 0 + # ARROW-8065 + # fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) + # assert len(fragments) == 2 + # assert len(list(fragments[0].get_row_group_fragments())) == 1 + # assert len(list(fragments[1].get_row_group_fragments())) == 0 def test_dataset(dataset): @@ -671,9 +672,10 @@ def test_fragments(tempdir): f = fragments[0] # file's schema does not include partition column - phys_schema = f.schema.remove(f.schema.get_field_index('part')) - assert f.format.inspect(f.path, f.filesystem) == phys_schema - assert f.partition_expression.equals(ds.field('part') == 'a') + # TODO(ARROW-8065) + # phys_schema = f.schema.remove(f.schema.get_field_index('part')) + # assert f.format.inspect(f.path, f.filesystem) == phys_schema + # assert f.partition_expression.equals(ds.field('part') == 'a') # scanning fragment includes partition columns result = f.to_table() @@ -716,7 +718,7 @@ def assert_yields_projected(fragment, row_slice, columns): # manually re-construct a fragment, with explicit schema new_fragment = parquet_format.make_fragment( - fragment.path, fragment.filesystem, schema=dataset.schema, + fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression) assert new_fragment.to_table().equals(fragment.to_table()) assert_yields_projected(new_fragment, (0, 4), table.column_names) From 8bdb53792b6edfda908479308ba2ccc45e71309d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Wed, 15 Apr 2020 14:21:49 -0400 Subject: [PATCH 2/4] Refactor GetFragments(ScanOptions) to GetFragments(Expression) --- cpp/src/arrow/dataset/dataset.cc | 62 ++++++-------------- cpp/src/arrow/dataset/dataset.h | 19 +++--- cpp/src/arrow/dataset/dataset_internal.h | 6 +- cpp/src/arrow/dataset/discovery_test.cc | 2 +- cpp/src/arrow/dataset/file_base.cc | 7 +-- cpp/src/arrow/dataset/file_base.h | 2 +- cpp/src/arrow/dataset/file_ipc_test.cc | 4 +- cpp/src/arrow/dataset/file_parquet.cc | 3 +- cpp/src/arrow/dataset/file_test.cc | 42 ++++++------- cpp/src/arrow/dataset/filter.h | 5 ++ cpp/src/arrow/dataset/scanner.cc | 2 +- cpp/src/arrow/dataset/test_util.h | 2 +- python/pyarrow/_dataset.pyx | 24 ++++++-- python/pyarrow/includes/libarrow_dataset.pxd | 2 + python/pyarrow/tests/test_dataset.py | 17 +++--- 15 files changed, 91 insertions(+), 108 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index f4dca0b1c26..e2e2fb8a51c 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -52,9 +52,16 @@ Result InMemoryFragment::Scan(std::shared_ptr opt // multiple times. auto batches_it = MakeVectorIterator(record_batches_); + auto batch_size = options->batch_size; // RecordBatch -> ScanTask auto fn = [=](std::shared_ptr batch) -> std::shared_ptr { - RecordBatchVector batches{batch}; + RecordBatchVector batches; + + auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); + for (int i = 0; i < n_batches; i++) { + batches.push_back(batch->Slice(batch_size * i, batch_size)); + } + return ::arrow::internal::make_unique( std::move(batches), std::move(options), std::move(context)); }; @@ -71,36 +78,15 @@ Result> Dataset::NewScan() { return NewScan(std::make_shared()); } -bool Dataset::AssumePartitionExpression( - const std::shared_ptr& scan_options, - std::shared_ptr* simplified_scan_options) const { - if (partition_expression_ == nullptr) { - if (simplified_scan_options != nullptr) { - *simplified_scan_options = scan_options; - } - return true; - } +FragmentIterator Dataset::GetFragments() { return GetFragments(scalar(true)); } - auto expr = scan_options->filter->Assume(*partition_expression_); - if (expr->IsNull() || expr->Equals(false)) { - // selector is not satisfiable; yield no fragments - return false; +FragmentIterator Dataset::GetFragments(std::shared_ptr predicate) { + if (partition_expression_) { + predicate = predicate->Assume(*partition_expression_); } - if (simplified_scan_options != nullptr) { - auto copy = std::make_shared(*scan_options); - copy->filter = std::move(expr); - *simplified_scan_options = std::move(copy); - } - return true; -} - -FragmentIterator Dataset::GetFragments(std::shared_ptr scan_options) { - std::shared_ptr simplified_scan_options; - if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) { - return MakeEmptyIterator>(); - } - return GetFragmentsImpl(std::move(simplified_scan_options)); + return predicate->IsSatisfiable() ? GetFragmentsImpl(std::move(predicate)) + : MakeEmptyIterator>(); } struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { @@ -140,27 +126,17 @@ Result> InMemoryDataset::ReplaceSchema( return std::make_shared(std::move(schema), get_batches_); } -FragmentIterator InMemoryDataset::GetFragmentsImpl( - std::shared_ptr scan_options) { +FragmentIterator InMemoryDataset::GetFragmentsImpl(std::shared_ptr) { auto schema = this->schema(); auto create_fragment = - [scan_options, - schema](std::shared_ptr batch) -> Result> { + [schema](std::shared_ptr batch) -> Result> { if (!batch->schema()->Equals(schema)) { return Status::TypeError("yielded batch had schema ", *batch->schema(), " which did not match InMemorySource's: ", *schema); } - RecordBatchVector batches; - - auto batch_size = scan_options->batch_size; - auto n_batches = BitUtil::CeilDiv(batch->num_rows(), batch_size); - - for (int i = 0; i < n_batches; i++) { - batches.push_back(batch->Slice(batch_size * i, batch_size)); - } - + RecordBatchVector batches{batch}; return std::make_shared(std::move(batches)); }; @@ -191,8 +167,8 @@ Result> UnionDataset::ReplaceSchema( new UnionDataset(std::move(schema), std::move(children))); } -FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr options) { - return GetFragmentsFromDatasets(children_, options); +FragmentIterator UnionDataset::GetFragmentsImpl(std::shared_ptr predicate) { + return GetFragmentsFromDatasets(children_, predicate); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 758004e33ba..f718c9907f7 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -98,8 +98,9 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Result> NewScan(std::shared_ptr context); Result> NewScan(); - /// \brief GetFragments returns an iterator of Fragments given ScanOptions. - FragmentIterator GetFragments(std::shared_ptr options); + /// \brief GetFragments returns an iterator of Fragments given a predicate. + FragmentIterator GetFragments(std::shared_ptr predicate); + FragmentIterator GetFragments(); const std::shared_ptr& schema() const { return schema_; } @@ -128,13 +129,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { : schema_(std::move(schema)), partition_expression_(std::move(e)) {} Dataset() = default; - virtual FragmentIterator GetFragmentsImpl(std::shared_ptr options) = 0; - - /// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded - /// fragments. Returns false if the selector is not satisfiable in this Dataset. - virtual bool AssumePartitionExpression( - const std::shared_ptr& scan_options, - std::shared_ptr* simplified_scan_options) const; + virtual FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) = 0; std::shared_ptr schema_; std::shared_ptr partition_expression_; @@ -160,13 +155,14 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset { explicit InMemoryDataset(std::shared_ptr table); + std::string type_name() const override { return "in-memory"; } Result> ReplaceSchema( std::shared_ptr schema) const override; protected: - FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) override; std::shared_ptr get_batches_; }; @@ -182,6 +178,7 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset { static Result> Make(std::shared_ptr schema, DatasetVector children); + const DatasetVector& children() const { return children_; } std::string type_name() const override { return "union"; } @@ -190,7 +187,7 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset { std::shared_ptr schema) const override; protected: - FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) override; explicit UnionDataset(std::shared_ptr schema, DatasetVector children) : Dataset(std::move(schema)), children_(std::move(children)) {} diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 4b349cffc43..f642462e3b1 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -35,13 +35,13 @@ namespace dataset { /// \brief GetFragmentsFromDatasets transforms a vector into a /// flattened FragmentIterator. static inline FragmentIterator GetFragmentsFromDatasets( - const DatasetVector& datasets, std::shared_ptr options) { + const DatasetVector& datasets, std::shared_ptr predicate) { // Iterator auto datasets_it = MakeVectorIterator(datasets); // Dataset -> Iterator - auto fn = [options](std::shared_ptr dataset) -> FragmentIterator { - return dataset->GetFragments(options); + auto fn = [predicate](std::shared_ptr dataset) -> FragmentIterator { + return dataset->GetFragments(predicate); }; // Iterator> diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index f7a3281cb0c..92c0c3fb357 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -150,7 +150,7 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest { } options_ = ScanOptions::Make(schema); ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema)); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), paths); + AssertFragmentsAreFromPath(dataset_->GetFragments(), paths); } protected: diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index d9eedea1f14..0c2a39db564 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -167,7 +167,7 @@ std::shared_ptr FoldingAnd(const std::shared_ptr& l, } FragmentIterator FileSystemDataset::GetFragmentsImpl( - std::shared_ptr scan_options) { + std::shared_ptr predicate) { FragmentVector fragments; ExpressionVector fragment_partitions(forest_.size()); @@ -183,9 +183,8 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl( fragment_partitions[ref.i] = FoldingAnd(partition_expression_, partition); } - // simplify filter by partition information - auto filter = scan_options->filter->Assume(partition); - if (filter->IsNull() || filter->Equals(false)) { + auto simplified_predicate = predicate->Assume(partition); + if (!simplified_predicate->IsSatisfiable()) { // directories (and descendants) which can't satisfy the filter are pruned return fs::PathForest::Prune; } diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index d19fa945baa..ae3b4ace21e 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -256,7 +256,7 @@ class ARROW_DS_EXPORT FileSystemDataset : public Dataset { std::string ToString() const; protected: - FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + FragmentIterator GetFragmentsImpl(std::shared_ptr predicate) override; FileSystemDataset(std::shared_ptr schema, std::shared_ptr root_partition, diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index 7d04126e0fa..1247eb109e7 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -187,8 +187,8 @@ TEST_F(TestIpcFileSystemDataset, Write) { opts_ = ScanOptions::Make(schema); auto partitioning_factory = DirectoryPartitioning::MakeFactory({"str", "i32"}); - ASSERT_OK_AND_ASSIGN(auto plan, partitioning_factory->MakeWritePlan( - schema, dataset_->GetFragments(options_))); + ASSERT_OK_AND_ASSIGN( + auto plan, partitioning_factory->MakeWritePlan(schema, dataset_->GetFragments())); plan.format = format_; plan.filesystem = fs_; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 7daf1400976..0f3548b42be 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -220,8 +220,7 @@ class RowGroupSkipper { } auto stats_expr = maybe_stats_expr.ValueOrDie(); - auto expr = filter_->Assume(stats_expr); - return (expr->IsNull() || expr->Equals(false)); + return !filter_->Assume(stats_expr)->IsSatisfiable(); } std::shared_ptr metadata_; diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 699ba989587..ee2bdc5cee2 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -80,15 +80,15 @@ TEST(FileSource, BufferBased) { TEST_F(TestFileSystemDataset, Basic) { MakeDataset({}); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {}); MakeDataset({fs::File("a"), fs::File("b"), fs::File("c")}); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b", "c"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {"a", "b", "c"}); AssertFilesAre(dataset_, {"a", "b", "c"}); // Should not create fragment from directories. MakeDataset({fs::Dir("A"), fs::Dir("A/B"), fs::File("A/a"), fs::File("A/B/b")}); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"A/a", "A/B/b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {"A/a", "A/B/b"}); AssertFilesAre(dataset_, {"A/a", "A/B/b"}); } @@ -121,27 +121,22 @@ TEST_F(TestFileSystemDataset, RootPartitionPruning) { MakeDataset({fs::File("a"), fs::File("b")}, root_partition); // Default filter should always return all data. - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(), {"a", "b"}); // filter == partition - options_->filter = root_partition; - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(root_partition), {"a", "b"}); // Same partition key, but non matching filter - options_->filter = ("a"_ == 6).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("a"_ == 6).Copy()), {}); - options_->filter = ("a"_ > 1).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("a"_ > 1).Copy()), {"a", "b"}); // different key shouldn't prune - options_->filter = ("b"_ == 6).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("b"_ == 6).Copy()), {"a", "b"}); // No partition should match MakeDataset({fs::File("a"), fs::File("b")}); - options_->filter = ("b"_ == 6).Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"a", "b"}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("b"_ == 6).Copy()), {"a", "b"}); } TEST_F(TestFileSystemDataset, TreePartitionPruning) { @@ -165,21 +160,20 @@ TEST_F(TestFileSystemDataset, TreePartitionPruning) { std::vector franklins = {"CA/Franklin", "NY/Franklin"}; // Default filter should always return all data. - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), all_cities); + AssertFragmentsAreFromPath(dataset_->GetFragments(), all_cities); // Dataset's partitions are respected - options_->filter = ("country"_ == "US").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), all_cities); - options_->filter = ("country"_ == "FR").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {}); + AssertFragmentsAreFromPath(dataset_->GetFragments(("country"_ == "US").Copy()), + all_cities); + AssertFragmentsAreFromPath(dataset_->GetFragments(("country"_ == "FR").Copy()), {}); - options_->filter = ("state"_ == "CA").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), ca_cities); + AssertFragmentsAreFromPath(dataset_->GetFragments(("state"_ == "CA").Copy()), + ca_cities); // Filter where no decisions can be made on inner nodes when filter don't // apply to inner partitions. - options_->filter = ("city"_ == "Franklin").Copy(); - AssertFragmentsAreFromPath(dataset_->GetFragments(options_), franklins); + AssertFragmentsAreFromPath(dataset_->GetFragments(("city"_ == "Franklin").Copy()), + franklins); } TEST_F(TestFileSystemDataset, FragmentPartitions) { @@ -202,7 +196,7 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) { }; AssertFragmentsHavePartitionExpressions( - dataset_->GetFragments(options_), + dataset_->GetFragments(), { with_root("state"_ == "CA", "city"_ == "San Francisco"), with_root("state"_ == "CA", "city"_ == "Franklin"), diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index e33b904bbfc..26f37b008fe 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -183,6 +183,11 @@ class ARROW_DS_EXPORT Expression { return Assume(*given); } + /// Indicates if the expression is satisfiable. + /// + /// This is a shortcut to check if the expression is neither null nor false. + bool IsSatisfiable() const { return !IsNull() && !Equals(false); } + /// returns a debug string representing this expression virtual std::string ToString() const = 0; diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 199571d2eb1..eb1152ba269 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -73,7 +73,7 @@ FragmentIterator Scanner::GetFragments() { // Transform Datasets in a flat Iterator. This // iterator is lazily constructed, i.e. Dataset::GetFragments is // not invoked until a Fragment is requested. - return GetFragmentsFromDatasets({dataset_}, scan_options_); + return GetFragmentsFromDatasets({dataset_}, scan_options_->filter); } Result Scanner::Scan() { diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 1ae2decc3f7..8ca055bf3b4 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -135,7 +135,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragments of a dataset. void AssertDatasetFragmentsEqual(RecordBatchReader* expected, Dataset* dataset, bool ensure_drained = true) { - auto it = dataset->GetFragments(options_); + auto it = dataset->GetFragments(options_->filter); ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index dad6e85758c..560bb6f58bf 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -104,21 +104,35 @@ cdef class Dataset: self.dataset.ReplaceSchema(pyarrow_unwrap_schema(schema))) return Dataset.wrap(move(copy)) - def get_fragments(self, columns=None, filter=None): + def get_fragments(self, Expression filter=None): """Returns an iterator over the fragments in this dataset. Parameters ---------- - columns : list of str, default None - List of columns to project. filter : Expression, default None - Scan will return only the rows matching the filter. + Return fragments matching the optional filter, either using the + partition_expression or internal information like Parquet's + statistics. Returns ------- fragments : iterator of Fragment """ - return Scanner(self, columns=columns, filter=filter).get_fragments() + cdef: + CFragmentIterator iterator + shared_ptr[CFragment] fragment + + if filter is None or filter.expr == nullptr: + iterator = self.dataset.GetFragments() + else: + iterator = self.dataset.GetFragments(filter.unwrap()) + + while True: + fragment = GetResultValue(iterator.Next()) + if fragment.get() == nullptr: + raise StopIteration() + else: + yield Fragment.wrap(fragment) def _scanner(self, **kwargs): return Scanner(self, **kwargs) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 93f79ca89b2..b3fd1ca17d8 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -188,6 +188,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CDataset "arrow::dataset::Dataset": const shared_ptr[CSchema] & schema() + CFragmentIterator GetFragments() + CFragmentIterator GetFragments(shared_ptr[CExpression] predicate) const shared_ptr[CExpression] & partition_expression() c_string type_name() diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index bfec7e256d4..6a92b5efe94 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -270,12 +270,8 @@ def test_filesystem_dataset(mockfs): assert row_group_fragments[0].path == path assert row_group_fragments[0].row_groups == {0} - # test predicate pushdown using row group metadata - # ARROW-8065 - # fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) - # assert len(fragments) == 2 - # assert len(list(fragments[0].get_row_group_fragments())) == 1 - # assert len(list(fragments[1].get_row_group_fragments())) == 0 + fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) + assert len(fragments) == 2 def test_dataset(dataset): @@ -652,13 +648,14 @@ def _create_dataset_for_fragments(tempdir, chunk_size=None): [range(8), [1] * 8, ['a'] * 4 + ['b'] * 4], names=['f1', 'f2', 'part'] ) + + path = str(tempdir / "test_parquet_dataset") + # write_to_dataset currently requires pandas - pq.write_to_dataset(table, str(tempdir / "test_parquet_dataset"), + pq.write_to_dataset(table, path, partition_cols=["part"], chunk_size=chunk_size) - dataset = ds.dataset(str(tempdir / "test_parquet_dataset/"), - format="parquet", partitioning="hive") - return table, dataset + return table, ds.dataset(path, format="parquet", partitioning="hive") @pytest.mark.pandas From f711826234ca1df2dcf5c19a73a44f28335d8c67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Mon, 20 Apr 2020 16:25:11 -0400 Subject: [PATCH 3/4] Add Fragment::ReadPhysicalSchema --- cpp/src/arrow/dataset/dataset.cc | 18 +-- cpp/src/arrow/dataset/dataset.h | 32 ++-- cpp/src/arrow/dataset/file_base.cc | 4 + cpp/src/arrow/dataset/file_base.h | 4 +- cpp/src/arrow/dataset/file_ipc.cc | 8 +- cpp/src/arrow/dataset/filter.cc | 27 ++-- cpp/src/arrow/dataset/partition.cc | 8 +- cpp/src/arrow/dataset/scanner.cc | 23 ++- cpp/src/arrow/dataset/scanner.h | 15 +- python/pyarrow/_dataset.pyx | 148 +++++++++++-------- python/pyarrow/includes/libarrow_dataset.pxd | 10 +- python/pyarrow/tests/test_dataset.py | 73 ++++----- 12 files changed, 211 insertions(+), 159 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index e2e2fb8a51c..6e903b1cbcd 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -30,20 +30,20 @@ namespace arrow { namespace dataset { -Fragment::Fragment(std::shared_ptr physical_schema, - std::shared_ptr partition_expression) - : physical_schema_(std::move(physical_schema)), - partition_expression_(partition_expression ? partition_expression : scalar(true)) { - /// TODO(ARROW-8065) - if (physical_schema_ == nullptr) { - physical_schema_ = schema({}); +Fragment::Fragment(std::shared_ptr partition_expression) + : partition_expression_(partition_expression ? partition_expression : scalar(true)) {} + +Result> InMemoryFragment::ReadPhysicalSchema() { + if (record_batches_.empty()) { + return schema({}); } + + return record_batches_[0]->schema(); } InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, std::shared_ptr partition_expression) - : Fragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(), - std::move(partition_expression)), + : Fragment(std::move(partition_expression)), record_batches_(std::move(record_batches)) {} Result InMemoryFragment::Scan(std::shared_ptr options, diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index f718c9907f7..be42c96a466 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -30,12 +30,21 @@ namespace arrow { namespace dataset { -/// \brief A granular piece of a Dataset, such as an individual file, which can be -/// read/scanned separately from other fragments. +/// \brief A granular piece of a Dataset, such as an individual file. /// -/// A Fragment yields a collection of RecordBatch, encapsulated in one or more ScanTasks. +/// A Fragment can be read/scanned separately from other fragments. It yields a +/// collection of RecordBatch, encapsulated in one or more ScanTasks. +/// +/// A notable difference from Dataset is that Fragments have physical schemas +/// which may differ from Fragments. class ARROW_DS_EXPORT Fragment { public: + /// \brief Return the physical schema of the Fragment. + /// + /// The physical schema is also called the writer schema. + /// This method is blocking and may suffer from high latency filesystem. + virtual Result> ReadPhysicalSchema() = 0; + /// \brief Scan returns an iterator of ScanTasks, each of which yields /// RecordBatches from this Fragment. /// @@ -54,8 +63,6 @@ class ARROW_DS_EXPORT Fragment { virtual std::string type_name() const = 0; - const std::shared_ptr& physical_schema() const { return physical_schema_; } - /// \brief An expression which evaluates to true for all data viewed by this /// Fragment. const std::shared_ptr& partition_expression() const { @@ -65,10 +72,8 @@ class ARROW_DS_EXPORT Fragment { virtual ~Fragment() = default; protected: - Fragment(std::shared_ptr physical_schema = NULLPTR, - std::shared_ptr partition_expression = NULLPTR); + explicit Fragment(std::shared_ptr partition_expression = NULLPTR); - std::shared_ptr physical_schema_; std::shared_ptr partition_expression_; }; @@ -79,6 +84,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { InMemoryFragment(RecordBatchVector record_batches, std::shared_ptr = NULLPTR); + Result> ReadPhysicalSchema() override; + Result Scan(std::shared_ptr options, std::shared_ptr context) override; @@ -90,8 +97,11 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { RecordBatchVector record_batches_; }; -/// \brief A container of zero or more Fragments. A Dataset acts as a discovery mechanism -/// of Fragments and partitions, e.g. files deeply nested in a directory. +/// \brief A container of zero or more Fragments. +/// +/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a +/// directory. A Dataset has a schema, also known as the "reader" schema. +/// class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: /// \brief Begin to build a new Scan operation against this Dataset @@ -155,7 +165,6 @@ class ARROW_DS_EXPORT InMemoryDataset : public Dataset { explicit InMemoryDataset(std::shared_ptr
table); - std::string type_name() const override { return "in-memory"; } Result> ReplaceSchema( @@ -178,7 +187,6 @@ class ARROW_DS_EXPORT UnionDataset : public Dataset { static Result> Make(std::shared_ptr schema, DatasetVector children); - const DatasetVector& children() const { return children_; } std::string type_name() const override { return "union"; } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 0c2a39db564..a744bb4eb88 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -72,6 +72,10 @@ Result> FileFormat::WriteFragment( return Status::NotImplemented("writing fragment of format ", type_name()); } +Result> FileFragment::ReadPhysicalSchema() { + return format_->Inspect(source_); +} + Result FileFragment::Scan(std::shared_ptr options, std::shared_ptr context) { return format_->ScanFile(source_, std::move(options), std::move(context)); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index ae3b4ace21e..5a60dfd6f83 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -159,6 +159,8 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> ReadPhysicalSchema() override; + Result Scan(std::shared_ptr options, std::shared_ptr context) override; @@ -171,7 +173,7 @@ class ARROW_DS_EXPORT FileFragment : public Fragment { protected: FileFragment(FileSource source, std::shared_ptr format, std::shared_ptr partition_expression) - : Fragment(NULLPTR, std::move(partition_expression)), + : Fragment(std::move(partition_expression)), source_(std::move(source)), format_(std::move(format)) {} diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index abd61f960d4..0ce312eaed5 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -158,7 +158,6 @@ Result IpcFileFormat::ScanFile( return IpcScanTaskIterator::Make(options, context, source); } -namespace internal { class IpcWriteTask : public WriteTask { public: IpcWriteTask(FileSource destination, std::shared_ptr format, @@ -199,15 +198,14 @@ class IpcWriteTask : public WriteTask { std::shared_ptr scan_options_; std::shared_ptr scan_context_; }; -} // namespace internal Result> IpcFileFormat::WriteFragment( FileSource destination, std::shared_ptr fragment, std::shared_ptr scan_options, std::shared_ptr scan_context) { - return std::make_shared( - std::move(destination), shared_from_this(), std::move(fragment), - std::move(scan_options), std::move(scan_context)); + return std::make_shared(std::move(destination), shared_from_this(), + std::move(fragment), std::move(scan_options), + std::move(scan_context)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc index d81501d7e60..6488c0dabf0 100644 --- a/cpp/src/arrow/dataset/filter.cc +++ b/cpp/src/arrow/dataset/filter.cc @@ -49,8 +49,8 @@ namespace arrow { namespace dataset { using arrow::compute::Datum; -using internal::checked_cast; -using internal::checked_pointer_cast; +using arrow::internal::checked_cast; +using arrow::internal::checked_pointer_cast; inline std::shared_ptr NullExpression() { return std::make_shared(std::make_shared()); @@ -655,31 +655,32 @@ std::string ScalarExpression::ToString() const { return value_->ToString() + ":" + type_repr; } +using arrow::internal::JoinStrings; + std::string AndExpression::ToString() const { - return internal::JoinStrings( + return JoinStrings( {"(", left_operand_->ToString(), " and ", right_operand_->ToString(), ")"}, ""); } std::string OrExpression::ToString() const { - return internal::JoinStrings( + return JoinStrings( {"(", left_operand_->ToString(), " or ", right_operand_->ToString(), ")"}, ""); } std::string NotExpression::ToString() const { if (operand_->type() == ExpressionType::IS_VALID) { const auto& is_valid = checked_cast(*operand_); - return internal::JoinStrings({"(", is_valid.operand()->ToString(), " is null)"}, ""); + return JoinStrings({"(", is_valid.operand()->ToString(), " is null)"}, ""); } - return internal::JoinStrings({"(not ", operand_->ToString(), ")"}, ""); + return JoinStrings({"(not ", operand_->ToString(), ")"}, ""); } std::string IsValidExpression::ToString() const { - return internal::JoinStrings({"(", operand_->ToString(), " is not null)"}, ""); + return JoinStrings({"(", operand_->ToString(), " is not null)"}, ""); } std::string InExpression::ToString() const { - return internal::JoinStrings( - {"(", operand_->ToString(), " is in ", set_->ToString(), ")"}, ""); + return JoinStrings({"(", operand_->ToString(), " is in ", set_->ToString(), ")"}, ""); } std::string CastExpression::ToString() const { @@ -691,13 +692,13 @@ std::string CastExpression::ToString() const { auto like = arrow::util::get>(to_); to = " like " + like->ToString(); } - return internal::JoinStrings({"(cast ", operand_->ToString(), std::move(to), ")"}, ""); + return JoinStrings({"(cast ", operand_->ToString(), std::move(to), ")"}, ""); } std::string ComparisonExpression::ToString() const { - return internal::JoinStrings({"(", left_operand_->ToString(), " ", OperatorName(op()), - " ", right_operand_->ToString(), ")"}, - ""); + return JoinStrings({"(", left_operand_->ToString(), " ", OperatorName(op()), " ", + right_operand_->ToString(), ")"}, + ""); } bool UnaryExpression::Equals(const Expression& other) const { diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index 4a08081a4e0..c72ae541e00 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -42,7 +42,7 @@ namespace dataset { using util::string_view; -using internal::checked_cast; +using arrow::internal::checked_cast; Result> Partitioning::Parse(const std::string& path) const { ExpressionVector expressions; @@ -440,9 +440,9 @@ struct DirectoryPartitioningFactory::MakeWritePlanImpl { // a depth first visitation order WRT their partition expressions. This makes // generation of the full directory tree far simpler since a directory's files are // grouped. - auto permutation = internal::ArgSort(right_hand_sides_); - internal::Permute(permutation, &source_fragments_); - internal::Permute(permutation, &right_hand_sides_); + auto permutation = arrow::internal::ArgSort(right_hand_sides_); + arrow::internal::Permute(permutation, &source_fragments_); + arrow::internal::Permute(permutation, &right_hand_sides_); // the basename of out.paths[i] is stored in segments[i] (full paths will be assembled // after segments is complete) diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index eb1152ba269..9f017678260 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -94,9 +94,18 @@ Result ScanTaskIteratorFromRecordBatch( ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, std::shared_ptr scan_context) : dataset_(std::move(dataset)), + fragment_(nullptr), scan_options_(ScanOptions::Make(dataset_->schema())), scan_context_(std::move(scan_context)) {} +ScannerBuilder::ScannerBuilder(std::shared_ptr schema, + std::shared_ptr fragment, + std::shared_ptr scan_context) + : dataset_(nullptr), + fragment_(std::move(fragment)), + scan_options_(ScanOptions::Make(schema)), + scan_context_(std::move(scan_context)) {} + Status ScannerBuilder::Project(std::vector columns) { RETURN_NOT_OK(schema()->CanReferenceFieldsByNames(columns)); has_projection_ = true; @@ -139,15 +148,21 @@ Result> ScannerBuilder::Finish() const { scan_options->evaluator = std::make_shared(); } + if (dataset_ == nullptr) { + return std::make_shared(fragment_, std::move(scan_options), scan_context_); + } + return std::make_shared(dataset_, std::move(scan_options), scan_context_); } -std::shared_ptr ScanContext::TaskGroup() const { +using arrow::internal::TaskGroup; + +std::shared_ptr ScanContext::TaskGroup() const { if (use_threads) { - internal::ThreadPool* thread_pool = arrow::internal::GetCpuThreadPool(); - return internal::TaskGroup::MakeThreaded(thread_pool); + auto* thread_pool = arrow::internal::GetCpuThreadPool(); + return TaskGroup::MakeThreaded(thread_pool); } - return internal::TaskGroup::MakeSerial(); + return TaskGroup::MakeSerial(); } Result> Scanner::ToTable() { diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index a41af106c44..238b8d0c490 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -143,13 +143,12 @@ ARROW_DS_EXPORT Result ScanTaskIteratorFromRecordBatch( /// \brief Scanner is a materialized scan operation with context and options /// bound. A scanner is the class that glues ScanTask, Fragment, -/// and Source. In python pseudo code, it performs the following: +/// and Dataset. In python pseudo code, it performs the following: /// /// def Scan(): -/// for source in this.sources_: -/// for fragment in source.GetFragments(this.options_): -/// for scan_task in fragment.Scan(this.context_): -/// yield scan_task +/// for fragment in self.dataset.GetFragments(this.options.filter): +/// for scan_task in fragment.Scan(this.options): +/// yield scan_task class ARROW_DS_EXPORT Scanner { public: Scanner(std::shared_ptr dataset, std::shared_ptr scan_options, @@ -200,6 +199,9 @@ class ARROW_DS_EXPORT ScannerBuilder { ScannerBuilder(std::shared_ptr dataset, std::shared_ptr scan_context); + ScannerBuilder(std::shared_ptr schema, std::shared_ptr fragment, + std::shared_ptr scan_context); + /// \brief Set the subset of columns to materialize. /// /// This subset will be passed down to Sources and corresponding Fragments. @@ -241,10 +243,11 @@ class ARROW_DS_EXPORT ScannerBuilder { /// \brief Return the constructed now-immutable Scanner object Result> Finish() const; - std::shared_ptr schema() const { return dataset_->schema(); } + std::shared_ptr schema() const { return scan_options_->schema(); } private: std::shared_ptr dataset_; + std::shared_ptr fragment_; std::shared_ptr scan_options_; std::shared_ptr scan_context_; bool has_projection_ = false; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 560bb6f58bf..8267bcfce20 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -135,14 +135,14 @@ cdef class Dataset: yield Fragment.wrap(fragment) def _scanner(self, **kwargs): - return Scanner(self, **kwargs) + return Scanner.from_dataset(self, **kwargs) def scan(self, columns=None, filter=None, MemoryPool memory_pool=None, **kwargs): """Builds a scan operation against the dataset. - It produces a stream of ScanTasks which is meant to be a unit of work to - be dispatched. The tasks are not executed automatically, the user is + It produces a stream of ScanTasks which is meant to be a unit of work + to be dispatched. The tasks are not executed automatically, the user is responsible to execute and dispatch the individual tasks, so custom local task scheduling can be implemented. @@ -171,7 +171,7 @@ cdef class Dataset: scan_tasks : iterator of ScanTask """ return self._scanner(columns=columns, filter=filter, - memory_pool=memory_pool, **kwargs).scan() + memory_pool=memory_pool, **kwargs).scan() def to_batches(self, **kwargs): """Read the dataset as materialized record batches. @@ -484,9 +484,14 @@ cdef class Fragment: return self.wrapped @property - def schema(self): - """Return the schema of batches scanned from this Fragment.""" - return pyarrow_wrap_schema(self.fragment.physical_schema()) + def physical_schema(self): + """Return the physical schema of this Fragment. This schema can be + different from the dataset read schema.""" + cdef: + shared_ptr[CSchema] c_schema + + c_schema = GetResultValue(self.fragment.ReadPhysicalSchema()) + return pyarrow_wrap_schema(c_schema) @property def partition_expression(self): @@ -496,14 +501,14 @@ cdef class Fragment: return Expression.wrap(self.fragment.partition_expression()) def _scanner(self, **kwargs): - return Scanner._from_fragment(self, **kwargs) + return Scanner.from_fragment(self, **kwargs) def scan(self, columns=None, filter=None, use_threads=True, MemoryPool memory_pool=None, **kwargs): """Builds a scan operation against the dataset. - It produces a stream of ScanTasks which is meant to be a unit of work to - be dispatched. The tasks are not executed automatically, the user is + It produces a stream of ScanTasks which is meant to be a unit of work + to be dispatched. The tasks are not executed automatically, the user is responsible to execute and dispatch the individual tasks, so custom local task scheduling can be implemented. @@ -532,7 +537,8 @@ cdef class Fragment: scan_tasks : iterator of ScanTask """ return self._scanner(columns=columns, filter=filter, - use_threads=use_threads,memory_pool=memory_pool, **kwargs).scan() + use_threads=use_threads, memory_pool=memory_pool, + **kwargs).scan() def to_batches(self, **kwargs): """Read the fragment as materialized record batches. @@ -642,7 +648,8 @@ cdef class ParquetFileFragment(FileFragment): shared_ptr[CExpression] c_extra_filter shared_ptr[CFragment] c_fragment - c_extra_filter = _insert_implicit_casts(extra_filter, self.schema) + schema = self.physical_schema + c_extra_filter = _insert_implicit_casts(extra_filter, schema) c_format = self.file_fragment.format().get() c_iterator = move(GetResultValue(c_format.GetRowGroupFragments(deref( self.parquet_file_fragment), move(c_extra_filter)))) @@ -756,8 +763,8 @@ cdef class ParquetFileFormat(FileFormat): vector[int] c_row_groups if row_groups is None: - return super().make_fragment(path, filesystem, partition_expression) - + return super().make_fragment(path, filesystem, + partition_expression) for row_group in set(row_groups): c_row_groups.push_back( row_group) @@ -1328,6 +1335,35 @@ cdef class ScanTask: yield pyarrow_wrap_batch(record_batch) +cdef shared_ptr[CScanContext] _build_scan_context(bint use_threads=True, + MemoryPool memory_pool=None): + cdef: + shared_ptr[CScanContext] context + + context = make_shared[CScanContext]() + context.get().pool = maybe_unbox_memory_pool(memory_pool) + if use_threads is not None: + context.get().use_threads = use_threads + + return context + + +cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, + list columns=None, Expression filter=None, + int batch_size=32*2**10) except *: + cdef: + CScannerBuilder *builder + + builder = ptr.get() + if columns is not None: + check_status(builder.Project([tobytes(c) for c in columns])) + + check_status(builder.Filter(_insert_implicit_casts( + filter, pyarrow_wrap_schema(builder.schema())))) + + check_status(builder.BatchSize(batch_size)) + + cdef class Scanner: """A materialized scan operation with context and options bound. @@ -1368,39 +1404,8 @@ cdef class Scanner: shared_ptr[CScanner] wrapped CScanner* scanner - def __init__(self, Dataset dataset, list columns=None, - Expression filter=None, bint use_threads=True, - int batch_size=32*2**10, MemoryPool memory_pool=None): - cdef: - shared_ptr[CScanContext] context - shared_ptr[CScannerBuilder] builder - shared_ptr[CExpression] filter_expression - vector[c_string] columns_to_project - - # create scan context - context = make_shared[CScanContext]() - context.get().pool = maybe_unbox_memory_pool(memory_pool) - if use_threads is not None: - context.get().use_threads = use_threads - - # create scanner builder - builder = GetResultValue( - dataset.unwrap().get().NewScanWithContext(context) - ) - - # set the builder's properties - if columns is not None: - check_status(builder.get().Project([tobytes(c) for c in columns])) - - check_status(builder.get().Filter(_insert_implicit_casts( - filter, pyarrow_wrap_schema(builder.get().schema()) - ))) - - check_status(builder.get().BatchSize(batch_size)) - - # instantiate the scanner object - scanner = GetResultValue(builder.get().Finish()) - self.init(scanner) + def __init__(self): + _forbid_instantiation(self.__class__) cdef void init(self, const shared_ptr[CScanner]& sp): self.wrapped = sp @@ -1412,24 +1417,51 @@ cdef class Scanner: self.init(sp) return self + cdef inline shared_ptr[CScanner] unwrap(self): + return self.wrapped + @staticmethod - def _from_fragment(Fragment fragment not None, bint use_threads=True, - MemoryPool memory_pool=None): + def from_dataset(Dataset dataset not None, + bint use_threads=True, MemoryPool memory_pool=None, + list columns=None, Expression filter=None, + int batch_size=32*2**10): cdef: - shared_ptr[CScanOptions] options shared_ptr[CScanContext] context + shared_ptr[CScannerBuilder] builder + shared_ptr[CScanner] scanner - # TODO(ARROW-8065) - options = CScanOptions.Make(pyarrow_unwrap_schema(fragment.schema)) + context = _build_scan_context(use_threads=use_threads, + memory_pool=memory_pool) + builder = make_shared[CScannerBuilder](dataset.unwrap(), context) + _populate_builder(builder, columns=columns, filter=filter, + batch_size=batch_size) - context = make_shared[CScanContext]() - context.get().pool = maybe_unbox_memory_pool(memory_pool) - context.get().use_threads = use_threads + scanner = GetResultValue(builder.get().Finish()) + return Scanner.wrap(scanner) - return Scanner.wrap(make_shared[CScanner](fragment.unwrap(), options, context)) + @staticmethod + def from_fragment(Fragment fragment not None, Schema schema=None, + bint use_threads=True, MemoryPool memory_pool=None, + list columns=None, Expression filter=None, + int batch_size=32*2**10): + cdef: + shared_ptr[CScanContext] context + shared_ptr[CScannerBuilder] builder + shared_ptr[CScanner] scanner - cdef inline shared_ptr[CScanner] unwrap(self): - return self.wrapped + context = _build_scan_context(use_threads=use_threads, + memory_pool=memory_pool) + + if schema is None: + schema = fragment.physical_schema + + builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema), + fragment.unwrap(), context) + _populate_builder(builder, columns=columns, filter=filter, + batch_size=batch_size) + + scanner = GetResultValue(builder.get().Finish()) + return Scanner.wrap(scanner) def scan(self): """Returns a stream of ScanTasks diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index b3fd1ca17d8..bc2ef32659c 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -140,7 +140,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: @staticmethod shared_ptr[CScanOptions] Make(shared_ptr[CSchema] schema) - cdef cppclass CScanContext "arrow::dataset::ScanContext": c_bool use_threads CMemoryPool * pool @@ -152,9 +151,9 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CResult[CRecordBatchIterator] Execute() cdef cppclass CFragment "arrow::dataset::Fragment": + CResult[shared_ptr[CSchema]] ReadPhysicalSchema() CResult[CScanTaskIterator] Scan( shared_ptr[CScanOptions] options, shared_ptr[CScanContext] context) - const shared_ptr[CSchema]& physical_schema() const c_bool splittable() const c_string type_name() const const shared_ptr[CExpression]& partition_expression() const @@ -166,7 +165,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::FragmentIterator" cdef cppclass CScanner "arrow::dataset::Scanner": - CScanner(shared_ptr[CFragment], shared_ptr[CScanOptions], shared_ptr[CScanContext]) + CScanner(shared_ptr[CDataset], shared_ptr[CScanOptions], + shared_ptr[CScanContext]) + CScanner(shared_ptr[CFragment], shared_ptr[CScanOptions], + shared_ptr[CScanContext]) CResult[CScanTaskIterator] Scan() CResult[shared_ptr[CTable]] ToTable() CFragmentIterator GetFragments() @@ -175,6 +177,8 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CScannerBuilder "arrow::dataset::ScannerBuilder": CScannerBuilder(shared_ptr[CDataset], shared_ptr[CScanContext] scan_context) + CScannerBuilder(shared_ptr[CSchema], shared_ptr[CFragment], + shared_ptr[CScanContext] scan_context) CStatus Project(const vector[c_string]& columns) CStatus Filter(const CExpression& filter) CStatus Filter(shared_ptr[CExpression] filter) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 6a92b5efe94..1fad1be8d09 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -295,8 +295,7 @@ def test_dataset(dataset): assert len(table) == 10 condition = ds.field('i64') == 1 - scanner = ds.Scanner(dataset, use_threads=True, filter=condition) - result = scanner.to_table().to_pydict() + result = dataset.to_table(use_threads=True, filter=condition).to_pydict() # don't rely on the scanning order assert result['i64'] == [1, 1] @@ -306,15 +305,16 @@ def test_dataset(dataset): def test_scanner(dataset): - scanner = ds.Scanner(dataset, memory_pool=pa.default_memory_pool()) + scanner = ds.Scanner.from_dataset(dataset, + memory_pool=pa.default_memory_pool()) assert isinstance(scanner, ds.Scanner) assert len(list(scanner.scan())) == 2 with pytest.raises(pa.ArrowInvalid): - dataset.scan(columns=['unknown']) + ds.Scanner.from_dataset(dataset, columns=['unknown']) - scanner = ds.Scanner(dataset, columns=['i64'], - memory_pool=pa.default_memory_pool()) + scanner = ds.Scanner.from_dataset(dataset, columns=['i64'], + memory_pool=pa.default_memory_pool()) assert isinstance(scanner, ds.Scanner) assert len(list(scanner.scan())) == 2 @@ -598,7 +598,7 @@ def test_filesystem_factory(mockfs, paths_or_selector): assert isinstance(dataset, ds.FileSystemDataset) assert len(list(dataset.scan())) == 2 - scanner = ds.Scanner(dataset) + scanner = ds.Scanner.from_dataset(dataset) expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) expected_str = pa.DictionaryArray.from_arrays( @@ -669,45 +669,33 @@ def test_fragments(tempdir): f = fragments[0] # file's schema does not include partition column - # TODO(ARROW-8065) - # phys_schema = f.schema.remove(f.schema.get_field_index('part')) - # assert f.format.inspect(f.path, f.filesystem) == phys_schema - # assert f.partition_expression.equals(ds.field('part') == 'a') + assert f.physical_schema.names == ['f1', 'f2'] + assert f.format.inspect(f.path, f.filesystem) == f.physical_schema + assert f.partition_expression.equals(ds.field('part') == 'a') # scanning fragment includes partition columns - result = f.to_table() - assert f.schema == result.schema + result = f.to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] - assert len(result) == 4 assert result.equals(table.slice(0, 4)) - - # scanning fragments follow column projection - fragments = list(dataset.get_fragments(columns=['f1', 'part'])) - assert len(fragments) == 2 - result = fragments[0].to_table() - assert result.column_names == ['f1', 'part'] - assert len(result) == 4 + assert f.physical_schema == result.schema.remove(2) # scanning fragments follow filter predicate - fragments = list(dataset.get_fragments(filter=ds.field('f1') < 2)) - assert len(fragments) == 2 - result = fragments[0].to_table() + result = f.to_table(schema=dataset.schema, filter=ds.field('f1') < 2) assert result.column_names == ['f1', 'f2', 'part'] - assert len(result) == 2 - result = fragments[1].to_table() - assert len(result) == 0 +@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_reconstruct(tempdir): table, dataset = _create_dataset_for_fragments(tempdir) - def assert_yields_projected(fragment, row_slice, columns): - actual = fragment.to_table() - assert actual.column_names == columns + def assert_yields_projected(fragment, row_slice, schema): + actual = fragment.to_table(schema=schema) + assert actual.schema == schema.schema - expected = table.slice(*row_slice).to_pandas()[[*columns]] + names = schema.names + expected = table.slice(*row_slice).to_pandas()[[*names]] assert actual.equals(pa.Table.from_pandas(expected)) fragment = list(dataset.get_fragments())[0] @@ -749,6 +737,7 @@ def assert_yields_projected(fragment, row_slice, columns): partition_expression=fragment.partition_expression) +@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups(tempdir): @@ -759,21 +748,19 @@ def test_fragments_parquet_row_groups(tempdir): # list and scan row group fragments row_group_fragments = list(fragment.get_row_group_fragments()) assert len(row_group_fragments) == 2 - result = row_group_fragments[0].to_table() + result = row_group_fragments[0].to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] assert len(result) == 2 assert result.equals(table.slice(0, 2)) - # scanning row group fragment follows column projection / filter predicate - fragment = list(dataset.get_fragments( - columns=['part', 'f1'], filter=ds.field('f1') < 1))[0] + fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0] row_group_fragments = list(fragment.get_row_group_fragments()) assert len(row_group_fragments) == 1 - result = row_group_fragments[0].to_table() - assert result.column_names == ['part', 'f1'] + result = row_group_fragments[0].to_table(filter=ds.field('f1') < 1) assert len(result) == 1 +@pytest.mark.skip(reason="ARROW-8318") @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups_reconstruct(tempdir): @@ -785,7 +772,7 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir): # manually re-construct row group fragments new_fragment = parquet_format.make_fragment( - fragment.path, fragment.filesystem, schema=dataset.schema, + fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression, row_groups=[0]) result = new_fragment.to_table() @@ -793,11 +780,11 @@ def test_fragments_parquet_row_groups_reconstruct(tempdir): # manually re-construct a row group fragment with filter/column projection new_fragment = parquet_format.make_fragment( - fragment.path, fragment.filesystem, schema=dataset.schema, - columns=['f1', 'part'], filter=ds.field('f1') < 3, + fragment.path, fragment.filesystem, partition_expression=fragment.partition_expression, row_groups={1}) - result = new_fragment.to_table() + result = new_fragment.to_table(columns=['f1', 'part'], + filter=ds.field('f1') < 3, ) assert result.column_names == ['f1', 'part'] assert len(result) == 1 @@ -1271,9 +1258,7 @@ def test_filter_implicit_cast(tempdir): dataset = ds.dataset(str(path)) filter_ = ds.field('a') > 2 - scanner = ds.Scanner(dataset, filter=filter_) - result = scanner.to_table() - assert len(result) == 3 + assert len(dataset.to_table(filter=filter_)) == 3 def test_dataset_union(multisourcefs): From 099a8c60f4cbfe6f6c56ae8d90377a085e020a58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Wed, 22 Apr 2020 15:18:38 -0400 Subject: [PATCH 4/4] Update according to review --- cpp/src/arrow/dataset/dataset.cc | 17 ++-- cpp/src/arrow/dataset/dataset.h | 17 ++-- cpp/src/arrow/dataset/file_parquet.cc | 6 +- cpp/src/arrow/dataset/file_parquet.h | 10 ++- cpp/src/arrow/dataset/file_parquet_test.cc | 2 +- python/pyarrow/_dataset.pyx | 100 ++++++++------------- python/pyarrow/tests/test_dataset.py | 11 ++- 7 files changed, 77 insertions(+), 86 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 6e903b1cbcd..d80165d6f66 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -33,19 +33,20 @@ namespace dataset { Fragment::Fragment(std::shared_ptr partition_expression) : partition_expression_(partition_expression ? partition_expression : scalar(true)) {} -Result> InMemoryFragment::ReadPhysicalSchema() { - if (record_batches_.empty()) { - return schema({}); - } - - return record_batches_[0]->schema(); -} +Result> InMemoryFragment::ReadPhysicalSchema() { return schema_; } -InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, +InMemoryFragment::InMemoryFragment(std::shared_ptr schema, + RecordBatchVector record_batches, std::shared_ptr partition_expression) : Fragment(std::move(partition_expression)), + schema_(std::move(schema)), record_batches_(std::move(record_batches)) {} +InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, + std::shared_ptr partition_expression) + : InMemoryFragment(record_batches.empty() ? schema({}) : record_batches[0]->schema(), + std::move(record_batches), std::move(partition_expression)) {} + Result InMemoryFragment::Scan(std::shared_ptr options, std::shared_ptr context) { // Make an explicit copy of record_batches_ to ensure Scan can be called diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index be42c96a466..6a82d529fb8 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -33,10 +33,12 @@ namespace dataset { /// \brief A granular piece of a Dataset, such as an individual file. /// /// A Fragment can be read/scanned separately from other fragments. It yields a -/// collection of RecordBatch, encapsulated in one or more ScanTasks. +/// collection of RecordBatches when scanned, encapsulated in one or more +/// ScanTasks. /// -/// A notable difference from Dataset is that Fragments have physical schemas -/// which may differ from Fragments. +/// Note that Fragments have well defined physical schemas which are reconciled by +/// the Datasets which contain them; these physical schemas may differ from a parent +/// Dataset's schema and the physical schemas of sibling Fragments. class ARROW_DS_EXPORT Fragment { public: /// \brief Return the physical schema of the Fragment. @@ -81,8 +83,10 @@ class ARROW_DS_EXPORT Fragment { /// RecordBatch. class ARROW_DS_EXPORT InMemoryFragment : public Fragment { public: - InMemoryFragment(RecordBatchVector record_batches, + InMemoryFragment(std::shared_ptr schema, RecordBatchVector record_batches, std::shared_ptr = NULLPTR); + explicit InMemoryFragment(RecordBatchVector record_batches, + std::shared_ptr = NULLPTR); Result> ReadPhysicalSchema() override; @@ -94,14 +98,15 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { std::string type_name() const override { return "in-memory"; } protected: + std::shared_ptr schema_; RecordBatchVector record_batches_; }; /// \brief A container of zero or more Fragments. /// /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a -/// directory. A Dataset has a schema, also known as the "reader" schema. -/// +/// directory. A Dataset has a schema to which Fragments must align during a +/// scan operation. This is analogous to Avro's reader and writer schema. class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { public: /// \brief Begin to build a new Scan operation against this Dataset diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 0f3548b42be..0c895784ff3 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -415,7 +415,7 @@ Result> ParquetFileFormat::MakeFragment( } Result ParquetFileFormat::GetRowGroupFragments( - const ParquetFileFragment& fragment, std::shared_ptr extra_filter) { + const ParquetFileFragment& fragment, std::shared_ptr filter) { auto properties = MakeReaderProperties(*this); ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(fragment.source(), std::move(properties))); @@ -430,8 +430,8 @@ Result ParquetFileFormat::GetRowGroupFragments( } FragmentVector fragments(row_groups.size()); - RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), extra_filter, - std::move(row_groups)); + RowGroupSkipper skipper(std::move(metadata), std::move(arrow_properties), + std::move(filter), std::move(row_groups)); for (int i = 0, row_group = skipper.Next(); row_group != RowGroupSkipper::kIterationDone; row_group = skipper.Next()) { diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 4ce0f8af297..6322e6d39a8 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -108,11 +108,15 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { std::vector row_groups); /// \brief Split a ParquetFileFragment into a Fragment for each row group. - /// Row groups whose metadata contradicts the fragment's filter or the extra_filter - /// will be excluded. + /// + /// \param[in] fragment to split + /// \param[in] filter expression that will ignore RowGroup that can't satisfy + /// the filter. + /// + /// \return An iterator of fragment. Result GetRowGroupFragments( const ParquetFileFragment& fragment, - std::shared_ptr extra_filter = scalar(true)); + std::shared_ptr filter = scalar(true)); }; class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 0a1ec2acbe3..ee65e50382c 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -467,7 +467,7 @@ TEST_F(TestParquetFileFormat, PredicatePushdownRowGroupFragments) { "i64"_ >= int64_t(6)); CountRowGroupsInFragment(fragment, {5, 6, 7}, - "i64"_ >= int64_t(6) && "i64"_ < int64_t(8)); + "i64"_ >= int64_t(6) and "i64"_ < int64_t(8)); } TEST_F(TestParquetFileFormat, ExplicitRowGroupSelection) { diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 8267bcfce20..8477d5e1722 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -137,8 +137,7 @@ cdef class Dataset: def _scanner(self, **kwargs): return Scanner.from_dataset(self, **kwargs) - def scan(self, columns=None, filter=None, - MemoryPool memory_pool=None, **kwargs): + def scan(self, **kwargs): """Builds a scan operation against the dataset. It produces a stream of ScanTasks which is meant to be a unit of work @@ -159,9 +158,16 @@ cdef class Dataset: filter : Expression, default None Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the - partition information or internal metadata found in fragments, - e.g. Parquet statistics. Otherwise filters the loaded + partition information or internal metadata found in the data + source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them. + batch_size : int, default 32K + The maximum row count for scanned record batches. If scanned + record batches are overflowing memory then this method can be + called to reduce their size. + use_threads : bool, default True + If enabled, then maximum parallelism will be used determined by + the number of available CPU cores. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -170,8 +176,7 @@ cdef class Dataset: ------- scan_tasks : iterator of ScanTask """ - return self._scanner(columns=columns, filter=filter, - memory_pool=memory_pool, **kwargs).scan() + return self._scanner(**kwargs).scan() def to_batches(self, **kwargs): """Read the dataset as materialized record batches. @@ -179,25 +184,7 @@ cdef class Dataset: Builds a scan operation against the dataset and sequentially executes the ScanTasks as the returned generator gets consumed. - Parameters - ---------- - columns : list of str, default None - List of columns to project. Order and duplicates will be preserved. - The columns will be passed down to Datasets and corresponding data - fragments to avoid loading, copying, and deserializing columns - that will not be required further down the compute chain. - By default all of the available columns are projected. Raises - an exception if any of the referenced column names does not exist - in the dataset's Schema. - filter : Expression, default None - Scan will return only the rows matching the filter. - If possible the predicate will be pushed down to exploit the - partition information or internal metadata found in the fragments, - e.g. Parquet statistics. Otherwise filters the loaded - RecordBatches before yielding them. - memory_pool : MemoryPool, default None - For memory allocations, if required. If not specified, uses the - default pool. + See scan method parameters documentation. Returns ------- @@ -211,28 +198,7 @@ cdef class Dataset: Note that this method reads all the selected data from the dataset into memory. - Parameters - ---------- - columns : list of str, default None - List of columns to project. Order and duplicates will be preserved. - The columns will be passed down to Datasets and corresponding data - fragments to avoid loading, copying, and deserializing columns - that will not be required further down the compute chain. - By default all of the available columns are projected. Raises - an exception if any of the referenced column names does not exist - in the dataset's Schema. - filter : Expression, default None - Scan will return only the rows matching the filter. - If possible the predicate will be pushed down to exploit the - partition information or internal metadata found in the fragments, - e.g. Parquet statistics. Otherwise filters the loaded - RecordBatches before yielding them. - use_threads : boolean, default True - If enabled, then maximum parallelism will be used determined by - the number of available CPU cores. - memory_pool : MemoryPool, default None - For memory allocations, if required. If not specified, uses the - default pool. + See scan method parameters documentation. Returns ------- @@ -503,8 +469,7 @@ cdef class Fragment: def _scanner(self, **kwargs): return Scanner.from_fragment(self, **kwargs) - def scan(self, columns=None, filter=None, use_threads=True, - MemoryPool memory_pool=None, **kwargs): + def scan(self, Schema schema=None, **kwargs): """Builds a scan operation against the dataset. It produces a stream of ScanTasks which is meant to be a unit of work @@ -514,6 +479,10 @@ cdef class Fragment: Parameters ---------- + schema : Schema + Schema to use for scanning. This is used to unify a Fragment to + it's Dataset's schema. If not specified this will use the + Fragment's physical schema which might differ for each Fragment. columns : list of str, default None List of columns to project. Order and duplicates will be preserved. The columns will be passed down to Datasets and corresponding data @@ -525,9 +494,16 @@ cdef class Fragment: filter : Expression, default None Scan will return only the rows matching the filter. If possible the predicate will be pushed down to exploit the - partition information or internal metadata found in fragments, - e.g. Parquet statistics. Otherwise filters the loaded + partition information or internal metadata found in the data + source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them. + batch_size : int, default 32K + The maximum row count for scanned record batches. If scanned + record batches are overflowing memory then this method can be + called to reduce their size. + use_threads : bool, default True + If enabled, then maximum parallelism will be used determined by + the number of available CPU cores. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -536,34 +512,32 @@ cdef class Fragment: ------- scan_tasks : iterator of ScanTask """ - return self._scanner(columns=columns, filter=filter, - use_threads=use_threads, memory_pool=memory_pool, - **kwargs).scan() + return self._scanner(schema=schema, **kwargs).scan() - def to_batches(self, **kwargs): + def to_batches(self, Schema schema=None, **kwargs): """Read the fragment as materialized record batches. - See scan() method arguments. + See scan method parameters documentation. Returns ------- record_batches : iterator of RecordBatch """ - return self._scanner(**kwargs).to_batches() + return self._scanner(schema=schema, **kwargs).to_batches() - def to_table(self, **kwargs): + def to_table(self, Schema schema=None, **kwargs): """Convert this Fragment into a Table. Use this convenience utility with care. This will serially materialize the Scan result in memory before creating the Table. - See scan() method arguments. + See scan method parameters documentation. Returns ------- table : Table """ - return self._scanner(**kwargs).to_table() + return self._scanner(schema=schema, **kwargs).to_table() cdef class FileFragment(Fragment): @@ -1388,13 +1362,13 @@ cdef class Scanner: partition information or internal metadata found in the data source, e.g. Parquet statistics. Otherwise filters the loaded RecordBatches before yielding them. - use_threads : bool, default True - If enabled, then maximum parallelism will be used determined by - the number of available CPU cores. batch_size : int, default 32K The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this method can be called to reduce their size. + use_threads : bool, default True + If enabled, then maximum parallelism will be used determined by + the number of available CPU cores. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 1fad1be8d09..951de2adfb6 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -668,12 +668,19 @@ def test_fragments(tempdir): assert len(fragments) == 2 f = fragments[0] + physical_names = ['f1', 'f2'] # file's schema does not include partition column - assert f.physical_schema.names == ['f1', 'f2'] + assert f.physical_schema.names == physical_names assert f.format.inspect(f.path, f.filesystem) == f.physical_schema assert f.partition_expression.equals(ds.field('part') == 'a') - # scanning fragment includes partition columns + # By default, the partition column is not part of the schema. + result = f.to_table() + assert result.column_names == physical_names + assert result.equals(table.remove_column(2).slice(0, 4)) + + # scanning fragment includes partition columns when given the proper + # schema. result = f.to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] assert result.equals(table.slice(0, 4))