From d66f159895437eb9440694d23b207eb561fd2281 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Aug 2019 16:14:06 -0400 Subject: [PATCH 01/13] add an Expression stub --- cpp/src/arrow/dataset/dataset.cc | 7 +++---- cpp/src/arrow/dataset/dataset.h | 22 ++++++++++++++++------ cpp/src/arrow/dataset/scanner.h | 6 ------ 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 548f90612b3..233ae813bc9 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -46,11 +46,10 @@ Status SimpleDataFragment::Scan(std::shared_ptr scan_context, return Status::OK(); } -Status Dataset::Make(const std::vector>& sources, - const std::shared_ptr& schema, - std::shared_ptr* out) { +Status Dataset::Make(std::vector> sources, + std::shared_ptr schema, std::shared_ptr* out) { // TODO: Ensure schema and sources align. - *out = std::make_shared(sources, schema); + *out = std::make_shared(std::move(sources), std::move(schema)); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 4e75112ad4a..b86faf13ef4 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -22,6 +22,7 @@ #include #include +#include "arrow/dataset/filter.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/iterator.h" @@ -79,9 +80,19 @@ class ARROW_DS_EXPORT DataSource { /// controls filtering and schema inference. virtual DataFragmentIterator GetFragments(std::shared_ptr options) = 0; + /// \brief An expression which evaluates to true for all data viewed by this DataSource. + /// May be null, which indicates no information is available. + const std::shared_ptr& condition() const { return condition_; } + virtual std::string type() const = 0; virtual ~DataSource() = default; + + protected: + DataSource() = default; + explicit DataSource(std::shared_ptr c) : condition_(std::move(c)) {} + + std::shared_ptr condition_; }; /// \brief A DataSource consisting of a flat sequence of DataFragments @@ -107,13 +118,12 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { /// WARNING, this constructor is not recommend, use Dataset::Make instead. /// \param[in] sources one or more input data sources /// \param[in] schema a known schema to conform to, may be nullptr - explicit Dataset(const std::vector>& sources, - const std::shared_ptr& schema) - : schema_(schema), sources_(sources) {} + explicit Dataset(std::vector> sources, + std::shared_ptr schema) + : schema_(std::move(schema)), sources_(std::move(sources)) {} - static Status Make(const std::vector>& sources, - const std::shared_ptr& schema, - std::shared_ptr* out); + static Status Make(std::vector> sourcs, + std::shared_ptr schema, std::shared_ptr* out); /// \brief Begin to build a new Scan operation against this Dataset Status NewScan(std::unique_ptr* out); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 023290cc8b8..d1ece04cdef 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -34,12 +34,6 @@ struct ARROW_DS_EXPORT ScanContext { MemoryPool* pool = arrow::default_memory_pool(); }; -// TODO(wesm): API for handling of post-materialization filters. For -// example, if the user requests [$col1 > 0, $col2 > 0] and $col1 is a -// partition key, but $col2 is not, then the filter "$col2 > 0" must -// be evaluated in-memory against the RecordBatch objects resulting -// from the Scan - class ARROW_DS_EXPORT ScanOptions { public: virtual ~ScanOptions() = default; From 4f5a8bcb1d4731fd2ae078eaaaf152865ebc8dbc Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Aug 2019 17:26:16 -0400 Subject: [PATCH 02/13] rename partitionner_ --- cpp/src/arrow/dataset/file_parquet.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 77c3a71e2d3..f0a9e010035 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -128,7 +128,7 @@ class ParquetScanTaskIterator { } Status Next(ScanTaskPtr* task) { - auto partition = partitionner_.Next(); + auto partition = partitioner_.Next(); // Iteration is done. if (partition.size() == 0) { @@ -155,11 +155,11 @@ class ParquetScanTaskIterator { std::shared_ptr metadata, std::unique_ptr reader) : columns_projection_(columns_projection), - partitionner_(std::move(metadata)), + partitioner_(std::move(metadata)), reader_(std::move(reader)) {} std::vector columns_projection_; - ParquetRowGroupPartitioner partitionner_; + ParquetRowGroupPartitioner partitioner_; std::shared_ptr reader_; }; From b1a6c541f1921a66f7d8c7c8f650d8d7d189f1c5 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Aug 2019 17:34:46 -0400 Subject: [PATCH 03/13] remove unused FileSystemBasedDataSource::options_ --- cpp/src/arrow/dataset/file_base.cc | 12 +++++------ cpp/src/arrow/dataset/file_base.h | 3 --- cpp/src/arrow/dataset/test_util.h | 34 +++++++++++++++++------------- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 20f4944c830..10fb263d1ad 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -45,20 +45,18 @@ Status FileBasedDataFragment::Scan(std::shared_ptr scan_context, return format_->ScanFile(source_, scan_options_, scan_context, out); } -FileSystemBasedDataSource::FileSystemBasedDataSource( - fs::FileSystem* filesystem, const fs::Selector& selector, - std::shared_ptr format, std::shared_ptr scan_options, - std::vector stats) +FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem, + const fs::Selector& selector, + std::shared_ptr format, + std::vector stats) : filesystem_(filesystem), selector_(std::move(selector)), format_(std::move(format)), - scan_options_(std::move(scan_options)), stats_(std::move(stats)) {} Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, std::unique_ptr* out) { std::vector stats; RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats)); @@ -71,7 +69,7 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, stats.resize(new_end - stats.begin()); out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format), - std::move(scan_options), std::move(stats))); + std::move(stats))); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index f3add99b409..ec9ead59302 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -170,7 +170,6 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { public: static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, std::unique_ptr* out); std::string type() const override { return "directory"; } @@ -180,13 +179,11 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, std::vector stats); fs::FileSystem* filesystem_ = NULLPTR; fs::Selector selector_; std::shared_ptr format_; - std::shared_ptr scan_options_; std::vector stats_; }; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 74b81b90641..b0d9ff9fa1e 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -163,6 +163,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { void SetUp() override { format_ = std::make_shared(); + schema_ = schema({field("dummy", null())}); + options_ = std::make_shared(); ASSERT_OK( TemporaryDir::Make("test-fsdatasource-" + format_->name() + "-", &temp_dir_)); @@ -187,8 +189,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { } void MakeDataSource() { - ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, - std::make_shared(), &source_)); + ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, &source_)); } protected: @@ -197,8 +198,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK( - source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { + ASSERT_OK(source_->GetFragments(options_).Visit( + [&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -218,8 +219,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK( - source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { + ASSERT_OK(source_->GetFragments(options_).Visit( + [&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -242,15 +243,16 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { ASSERT_RAISES( IOError, - source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + source_->GetFragments(options_).Visit( + [&](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + auto extension = + fs::internal::GetAbstractPathExtension(file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + })); } fs::Selector selector_; @@ -259,6 +261,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { std::shared_ptr fs_; std::unique_ptr temp_dir_; std::shared_ptr format_; + std::shared_ptr schema_; + std::shared_ptr options_; }; template From 955cb563308a659f324808d354af0ac2fb38b648 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 28 Aug 2019 11:07:37 -0400 Subject: [PATCH 04/13] flesh out shim Expression class --- cpp/src/arrow/dataset/file_base.cc | 1 + cpp/src/arrow/dataset/file_parquet.cc | 4 +++- cpp/src/arrow/dataset/filter.h | 1 + 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 10fb263d1ad..aaabca75bb5 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -75,6 +75,7 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, DataFragmentIterator FileSystemBasedDataSource::GetFragments( std::shared_ptr options) { + // TODO(bkietz) examine options.filters vs this->condition struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, std::shared_ptr scan_options, std::vector stats) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index f0a9e010035..20879c8e4c5 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -37,6 +37,7 @@ using RecordBatchReaderPtr = std::unique_ptr; // A set of RowGroup identifiers using RowGroupSet = std::vector; +// TODO(bkietz) refactor this to use ReconcilingRecordBatchReader class ParquetScanTask : public ScanTask { public: static Status Make(RowGroupSet row_groups, const std::vector& columns_projection, @@ -145,7 +146,8 @@ class ParquetScanTaskIterator { static Status InferColumnProjection(const parquet::FileMetaData& metadata, const std::shared_ptr& options, std::vector* out) { - // TODO(fsaintjacques): Compute intersection _and_ validity + // TODO(fsaintjacques): Compute intersection _and_ validity, could probably reuse + // RecordBatchProjector here *out = internal::Iota(metadata.num_columns()); return Status::OK(); diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index b305d4fd3d7..2ea44d75afb 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -21,6 +21,7 @@ #include #include +#include "arrow/compute/kernel.h" #include "arrow/compute/kernels/compare.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" From 949fa7a53239301ebb503e09da46b2c7b34f4925 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 28 Aug 2019 13:56:12 -0400 Subject: [PATCH 05/13] provide basic predicate pushdown to datasources --- cpp/src/arrow/dataset/dataset.cc | 32 +++++++ cpp/src/arrow/dataset/dataset.h | 12 ++- cpp/src/arrow/dataset/file_base.cc | 6 +- cpp/src/arrow/dataset/file_parquet.cc | 2 +- cpp/src/arrow/dataset/file_parquet_test.cc | 4 + cpp/src/arrow/dataset/file_test.cc | 4 + cpp/src/arrow/dataset/filter.cc | 29 +++++++ cpp/src/arrow/dataset/filter.h | 7 ++ cpp/src/arrow/dataset/scanner.h | 19 +++-- cpp/src/arrow/dataset/test_util.h | 99 ++++++++++++---------- cpp/src/arrow/dataset/type_fwd.h | 1 + 11 files changed, 159 insertions(+), 56 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 233ae813bc9..583177cc26b 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -20,6 +20,7 @@ #include #include +#include "arrow/dataset/filter.h" #include "arrow/dataset/scanner.h" #include "arrow/util/stl.h" @@ -60,5 +61,36 @@ Status Dataset::NewScan(std::unique_ptr* out) { return Status::OK(); } +DataFragmentIterator DataSource::AssumeCondition( + std::shared_ptr* options) const { + DCHECK_NE(options, nullptr); + if (*options == nullptr) { + // null scan context; no selector to simplify + return DataFragmentIterator(); + } + + auto c = SelectorAssume((*options)->selector, condition_); + DCHECK_OK(c.status()); + auto expr = std::move(c).ValueOrDie(); + + bool trivial = true; + if (expr->IsNull() || (expr->IsTrivialCondition(&trivial) && !trivial)) { + // don't yield any fragments + return MakeEmptyIterator>(); + } + + *options = std::make_shared(**options); + (*options)->selector = ExpressionSelector(std::move(expr)); + return DataFragmentIterator(); +} + +DataFragmentIterator SimpleDataSource::GetFragments( + std::shared_ptr options) { + if (auto empty = AssumeCondition(&options)) { + return empty; + } + return MakeVectorIterator(fragments_); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index b86faf13ef4..b76c1daca94 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -22,7 +22,6 @@ #include #include -#include "arrow/dataset/filter.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/iterator.h" @@ -84,6 +83,9 @@ class ARROW_DS_EXPORT DataSource { /// May be null, which indicates no information is available. const std::shared_ptr& condition() const { return condition_; } + /// FIXME(bkietz) providing a simple mutator like this is probably not ideal + void condition(std::shared_ptr c) { condition_ = std::move(c); } + virtual std::string type() const = 0; virtual ~DataSource() = default; @@ -92,6 +94,10 @@ class ARROW_DS_EXPORT DataSource { DataSource() = default; explicit DataSource(std::shared_ptr c) : condition_(std::move(c)) {} + /// Mutates a ScanOptions by assuming condition_ holds for all yielded fragments. + /// Returns non-null if context->selector is not satisfiable in this DataSource. + DataFragmentIterator AssumeCondition(std::shared_ptr* options) const; + std::shared_ptr condition_; }; @@ -101,9 +107,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { explicit SimpleDataSource(DataFragmentVector fragments) : fragments_(std::move(fragments)) {} - DataFragmentIterator GetFragments(std::shared_ptr options) override { - return MakeVectorIterator(fragments_); - } + DataFragmentIterator GetFragments(std::shared_ptr options) override; std::string type() const override { return "simple_data_source"; } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index aaabca75bb5..73b4ebf7f47 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -20,6 +20,7 @@ #include #include +#include "arrow/dataset/filter.h" #include "arrow/filesystem/filesystem.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" @@ -75,7 +76,10 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, DataFragmentIterator FileSystemBasedDataSource::GetFragments( std::shared_ptr options) { - // TODO(bkietz) examine options.filters vs this->condition + if (auto empty = AssumeCondition(&options)) { + return empty; + } + struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, std::shared_ptr scan_options, std::vector stats) diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 20879c8e4c5..e6298231b89 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -37,7 +37,7 @@ using RecordBatchReaderPtr = std::unique_ptr; // A set of RowGroup identifiers using RowGroupSet = std::vector; -// TODO(bkietz) refactor this to use ReconcilingRecordBatchReader +// TODO(bkietz) refactor this to use ProjectedRecordBatchReader class ParquetScanTask : public ScanTask { public: static Status Make(RowGroupSet row_groups, const std::vector& columns_projection, diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 8bb333bb718..171d8cee1a7 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -185,5 +185,9 @@ TEST_F(TestParquetFileSystemBasedDataSource, Recursive) { this->Recursive(); } TEST_F(TestParquetFileSystemBasedDataSource, DeletedFile) { this->DeletedFile(); } +TEST_F(TestParquetFileSystemBasedDataSource, PredicatePushDown) { + this->PredicatePushDown(); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index b6063a4f295..19e938a5e8b 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -107,5 +107,9 @@ TEST_F(TestDummyFileSystemBasedDataSource, Recursive) { this->Recursive(); } TEST_F(TestDummyFileSystemBasedDataSource, DeletedFile) { this->DeletedFile(); } +TEST_F(TestDummyFileSystemBasedDataSource, PredicatePushDown) { + this->PredicatePushDown(); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc index e8985dbeb7e..53bbb116218 100644 --- a/cpp/src/arrow/dataset/filter.cc +++ b/cpp/src/arrow/dataset/filter.cc @@ -28,6 +28,7 @@ #include "arrow/compute/context.h" #include "arrow/compute/kernels/boolean.h" #include "arrow/compute/kernels/compare.h" +#include "arrow/dataset/dataset.h" #include "arrow/record_batch.h" #include "arrow/util/logging.h" #include "arrow/visitor_inline.h" @@ -938,5 +939,33 @@ Result> FieldExpression::Validate(const Schema& schema return null(); } +Result> SelectorAssume( + const std::shared_ptr& selector, + const std::shared_ptr& given) { + if (selector == nullptr || selector->filters.size() == 0) { + return ScalarExpression::Make(true); + } + + auto get_expression = [](const std::shared_ptr& f) { + DCHECK_EQ(f->type(), FilterType::EXPRESSION); + return checked_cast(*f).expression(); + }; + + auto out_expr = get_expression(selector->filters[0]); + for (size_t i = 1; i < selector->filters.size(); ++i) { + out_expr = and_(std::move(out_expr), get_expression(selector->filters[i])); + } + + if (given == nullptr) { + return out_expr; + } + return out_expr->Assume(*given); +} + +std::shared_ptr ExpressionSelector(std::shared_ptr e) { + return std::make_shared( + DataSelector{FilterVector{std::make_shared(std::move(e))}}); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/filter.h b/cpp/src/arrow/dataset/filter.h index 2ea44d75afb..364fca0d2d4 100644 --- a/cpp/src/arrow/dataset/filter.h +++ b/cpp/src/arrow/dataset/filter.h @@ -403,5 +403,12 @@ inline FieldExpression operator"" _(const char* name, size_t name_length) { } } // namespace string_literals +ARROW_DS_EXPORT Result> SelectorAssume( + const std::shared_ptr& selector, + const std::shared_ptr& given); + +ARROW_DS_EXPORT std::shared_ptr ExpressionSelector( + std::shared_ptr e); + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index d1ece04cdef..cfb661f4c0a 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -36,18 +36,25 @@ struct ARROW_DS_EXPORT ScanContext { class ARROW_DS_EXPORT ScanOptions { public: - virtual ~ScanOptions() = default; + ScanOptions() = default; - const std::shared_ptr& selector() const { return selector_; } + ScanOptions(std::shared_ptr selector, std::shared_ptr schema, + std::vector> options = {}) + : selector(std::move(selector)), schema(std::move(schema)) {} - const std::shared_ptr& schema() const { return schema_; } + virtual ~ScanOptions() = default; + + MemoryPool* pool() const { return pool_; } - protected: // Filters - std::shared_ptr selector_; + std::shared_ptr selector; // Schema to which record batches will be reconciled - std::shared_ptr schema_; + std::shared_ptr schema; + + MemoryPool* pool_ = default_memory_pool(); + + std::vector> options; }; /// \brief Read record batches from a range of a single data fragment. A diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index b0d9ff9fa1e..30833c5225d 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -15,12 +15,14 @@ // specific language governing permissions and limitations // under the License. +#include #include #include #include #include #include "arrow/dataset/file_base.h" +#include "arrow/dataset/filter.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/path_util.h" #include "arrow/record_batch.h" @@ -162,6 +164,9 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { virtual std::vector file_names() const = 0; void SetUp() override { + selector_.base_dir = "/"; + selector_.recursive = true; + format_ = std::make_shared(); schema_ = schema({field("dummy", null())}); options_ = std::make_shared(); @@ -176,6 +181,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { for (auto path : file_names()) { CreateFile(path, ""); } + + condition_ = ScalarExpression::Make(true); } void CreateFile(std::string path, std::string contents) { @@ -190,69 +197,72 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { void MakeDataSource() { ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, &source_)); + source_->condition(condition_); } protected: + std::function fragment)> OpenFragments( + size_t* count) { + return [this, count](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + ++*count; + auto extension = + fs::internal::GetAbstractPathExtension(file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + }; + } + void NonRecursive() { - selector_.base_dir = "/"; + selector_.recursive = false; MakeDataSource(); - int count = 0; - ASSERT_OK(source_->GetFragments(options_).Visit( - [&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - ++count; - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); - + size_t count = 0; + ASSERT_OK(source_->GetFragments(options_).Visit(OpenFragments(&count))); ASSERT_EQ(count, 1); } void Recursive() { - selector_.base_dir = "/"; - selector_.recursive = true; MakeDataSource(); - int count = 0; - ASSERT_OK(source_->GetFragments(options_).Visit( - [&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - ++count; - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); - - ASSERT_EQ(count, 4); + size_t count = 0; + ASSERT_OK(source_->GetFragments(options_).Visit(OpenFragments(&count))); + ASSERT_EQ(count, file_names().size()); } void DeletedFile() { - selector_.base_dir = "/"; - selector_.recursive = true; MakeDataSource(); ASSERT_GT(file_names().size(), 0); ASSERT_OK(this->fs_->DeleteFile(file_names()[0])); - ASSERT_RAISES( - IOError, - source_->GetFragments(options_).Visit( - [&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + size_t count = 0; + ASSERT_RAISES(IOError, source_->GetFragments(options_).Visit(OpenFragments(&count))); + } + + void PredicatePushDown() { + condition_ = equal(field_ref("alpha"), ScalarExpression::Make(3)); + MakeDataSource(); + + options_->selector = std::make_shared(); + options_->selector->filters.resize(1); + + // with a filter identical to the partition condition, all fragments are yielded + options_->selector->filters[0] = + std::make_shared(condition_->Copy()); + + size_t count = 0; + // ASSERT_OK(source_->GetFragments(context_)->Visit(OpenFragments(&count))); + // ASSERT_EQ(count, file_names().size()); + + // with a filter which contradicts the partition condition, no fragments are yielded + options_->selector->filters[0] = std::make_shared( + equal(field_ref("alpha"), ScalarExpression::Make(0))); + + count = 0; + ASSERT_OK(source_->GetFragments(options_).Visit(OpenFragments(&count))); + ASSERT_EQ(count, 0); } fs::Selector selector_; @@ -263,6 +273,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { std::shared_ptr format_; std::shared_ptr schema_; std::shared_ptr options_; + std::shared_ptr condition_; }; template diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 4f195334e2f..db1ebb17130 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -53,6 +53,7 @@ class FileFormat; class FileScanOptions; class FileWriteOptions; +class Expression; class Filter; using FilterVector = std::vector>; From a651c65a13c92494d045cb9d429f7a64d028c36c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 30 Aug 2019 12:41:32 -0400 Subject: [PATCH 06/13] rename DataSource::condition to partition_expression --- cpp/src/arrow/dataset/dataset.cc | 2 +- cpp/src/arrow/dataset/dataset.h | 13 +++++++++---- cpp/src/arrow/dataset/test_util.h | 10 +++++----- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 583177cc26b..4cd58afd830 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -69,7 +69,7 @@ DataFragmentIterator DataSource::AssumeCondition( return DataFragmentIterator(); } - auto c = SelectorAssume((*options)->selector, condition_); + auto c = SelectorAssume((*options)->selector, partition_expression_); DCHECK_OK(c.status()); auto expr = std::move(c).ValueOrDie(); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index b76c1daca94..8114c7694be 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -81,10 +81,14 @@ class ARROW_DS_EXPORT DataSource { /// \brief An expression which evaluates to true for all data viewed by this DataSource. /// May be null, which indicates no information is available. - const std::shared_ptr& condition() const { return condition_; } + const std::shared_ptr& partition_expression() const { + return partition_expression_; + } /// FIXME(bkietz) providing a simple mutator like this is probably not ideal - void condition(std::shared_ptr c) { condition_ = std::move(c); } + void partition_expression(std::shared_ptr e) { + partition_expression_ = std::move(e); + } virtual std::string type() const = 0; @@ -92,13 +96,14 @@ class ARROW_DS_EXPORT DataSource { protected: DataSource() = default; - explicit DataSource(std::shared_ptr c) : condition_(std::move(c)) {} + explicit DataSource(std::shared_ptr c) + : partition_expression_(std::move(c)) {} /// Mutates a ScanOptions by assuming condition_ holds for all yielded fragments. /// Returns non-null if context->selector is not satisfiable in this DataSource. DataFragmentIterator AssumeCondition(std::shared_ptr* options) const; - std::shared_ptr condition_; + std::shared_ptr partition_expression_; }; /// \brief A DataSource consisting of a flat sequence of DataFragments diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 30833c5225d..ca9451e79f8 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -182,7 +182,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { CreateFile(path, ""); } - condition_ = ScalarExpression::Make(true); + partition_expression_ = ScalarExpression::Make(true); } void CreateFile(std::string path, std::string contents) { @@ -197,7 +197,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { void MakeDataSource() { ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, &source_)); - source_->condition(condition_); + source_->partition_expression(partition_expression_); } protected: @@ -242,7 +242,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { } void PredicatePushDown() { - condition_ = equal(field_ref("alpha"), ScalarExpression::Make(3)); + partition_expression_ = equal(field_ref("alpha"), ScalarExpression::Make(3)); MakeDataSource(); options_->selector = std::make_shared(); @@ -250,7 +250,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { // with a filter identical to the partition condition, all fragments are yielded options_->selector->filters[0] = - std::make_shared(condition_->Copy()); + std::make_shared(partition_expression_->Copy()); size_t count = 0; // ASSERT_OK(source_->GetFragments(context_)->Visit(OpenFragments(&count))); @@ -273,7 +273,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { std::shared_ptr format_; std::shared_ptr schema_; std::shared_ptr options_; - std::shared_ptr condition_; + std::shared_ptr partition_expression_; }; template From 19f26a0e54da8c5acd61de08624582cb2c34849f Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 30 Aug 2019 16:08:49 -0400 Subject: [PATCH 07/13] DataSource::assume -> bool, remove partition_expr mutator --- cpp/src/arrow/dataset/dataset.cc | 16 ++++++++-------- cpp/src/arrow/dataset/dataset.h | 13 ++++--------- cpp/src/arrow/dataset/file_base.cc | 25 ++++++++++++++++++------- cpp/src/arrow/dataset/file_base.h | 6 ++++++ cpp/src/arrow/dataset/scanner.cc | 1 + cpp/src/arrow/dataset/test_util.h | 5 +++-- 6 files changed, 40 insertions(+), 26 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 4cd58afd830..29ce9794cb3 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -22,6 +22,7 @@ #include "arrow/dataset/filter.h" #include "arrow/dataset/scanner.h" +#include "arrow/util/iterator.h" #include "arrow/util/stl.h" namespace arrow { @@ -61,12 +62,11 @@ Status Dataset::NewScan(std::unique_ptr* out) { return Status::OK(); } -DataFragmentIterator DataSource::AssumeCondition( - std::shared_ptr* options) const { +bool DataSource::AssumePartitionExpression(std::shared_ptr* options) const { DCHECK_NE(options, nullptr); if (*options == nullptr) { // null scan context; no selector to simplify - return DataFragmentIterator(); + return true; } auto c = SelectorAssume((*options)->selector, partition_expression_); @@ -75,19 +75,19 @@ DataFragmentIterator DataSource::AssumeCondition( bool trivial = true; if (expr->IsNull() || (expr->IsTrivialCondition(&trivial) && !trivial)) { - // don't yield any fragments - return MakeEmptyIterator>(); + // selector is not satisfiable; yield no fragments + return false; } *options = std::make_shared(**options); (*options)->selector = ExpressionSelector(std::move(expr)); - return DataFragmentIterator(); + return true; } DataFragmentIterator SimpleDataSource::GetFragments( std::shared_ptr options) { - if (auto empty = AssumeCondition(&options)) { - return empty; + if (!AssumePartitionExpression(&options)) { + return MakeEmptyIterator>(); } return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 8114c7694be..75d1a6eff3a 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -24,7 +24,7 @@ #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" -#include "arrow/util/iterator.h" +#include "arrow/util/macros.h" namespace arrow { namespace dataset { @@ -85,11 +85,6 @@ class ARROW_DS_EXPORT DataSource { return partition_expression_; } - /// FIXME(bkietz) providing a simple mutator like this is probably not ideal - void partition_expression(std::shared_ptr e) { - partition_expression_ = std::move(e); - } - virtual std::string type() const = 0; virtual ~DataSource() = default; @@ -99,9 +94,9 @@ class ARROW_DS_EXPORT DataSource { explicit DataSource(std::shared_ptr c) : partition_expression_(std::move(c)) {} - /// Mutates a ScanOptions by assuming condition_ holds for all yielded fragments. - /// Returns non-null if context->selector is not satisfiable in this DataSource. - DataFragmentIterator AssumeCondition(std::shared_ptr* options) const; + /// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded + /// fragments. Returns false if the selector is not satisfiable in this DataSource. + bool AssumePartitionExpression(std::shared_ptr* options) const; std::shared_ptr partition_expression_; }; diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 73b4ebf7f47..504c5da11b1 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -24,6 +24,7 @@ #include "arrow/filesystem/filesystem.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" +#include "arrow/util/iterator.h" #include "arrow/util/stl.h" namespace arrow { @@ -46,11 +47,12 @@ Status FileBasedDataFragment::Scan(std::shared_ptr scan_context, return format_->ScanFile(source_, scan_options_, scan_context, out); } -FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem, - const fs::Selector& selector, - std::shared_ptr format, - std::vector stats) - : filesystem_(filesystem), +FileSystemBasedDataSource::FileSystemBasedDataSource( + fs::FileSystem* filesystem, const fs::Selector& selector, + std::shared_ptr format, std::shared_ptr partition_expression, + std::vector stats) + : DataSource(std::move(partition_expression)), + filesystem_(filesystem), selector_(std::move(selector)), format_(std::move(format)), stats_(std::move(stats)) {} @@ -58,6 +60,7 @@ FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem, Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, + std::shared_ptr partition_expression, std::unique_ptr* out) { std::vector stats; RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats)); @@ -70,14 +73,22 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, stats.resize(new_end - stats.begin()); out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format), + std::move(partition_expression), std::move(stats))); return Status::OK(); } +Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, + const fs::Selector& selector, + std::shared_ptr format, + std::unique_ptr* out) { + return Make(filesystem, selector, std::move(format), nullptr, out); +} + DataFragmentIterator FileSystemBasedDataSource::GetFragments( std::shared_ptr options) { - if (auto empty = AssumeCondition(&options)) { - return empty; + if (!AssumePartitionExpression(&options)) { + return MakeEmptyIterator>(); } struct Impl : DataFragmentIterator { diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index ec9ead59302..34b6d05808a 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -172,6 +172,11 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { std::shared_ptr format, std::unique_ptr* out); + static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector, + std::shared_ptr format, + std::shared_ptr partition_expression, + std::unique_ptr* out); + std::string type() const override { return "directory"; } DataFragmentIterator GetFragments(std::shared_ptr options) override; @@ -179,6 +184,7 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, + std::shared_ptr partition_expression, std::vector stats); fs::FileSystem* filesystem_ = NULLPTR; diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 1e5b8a8f2f7..a0f39d05e2b 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -20,6 +20,7 @@ #include #include "arrow/dataset/dataset.h" +#include "arrow/util/iterator.h" namespace arrow { namespace dataset { diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index ca9451e79f8..459185e527f 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -28,6 +28,7 @@ #include "arrow/record_batch.h" #include "arrow/testing/gtest_util.h" #include "arrow/util/io_util.h" +#include "arrow/util/iterator.h" #include "arrow/util/stl.h" namespace arrow { @@ -196,8 +197,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { } void MakeDataSource() { - ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, &source_)); - source_->partition_expression(partition_expression_); + ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, + partition_expression_, &source_)); } protected: From 48b349f1d8a28fa91a073bef501158c66ccf2382 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 30 Aug 2019 16:24:05 -0400 Subject: [PATCH 08/13] move overridable GetFragments to protected GetFragmentsImpl --- cpp/src/arrow/dataset/dataset.cc | 10 +++++++--- cpp/src/arrow/dataset/dataset.h | 8 +++++--- cpp/src/arrow/dataset/file_base.cc | 2 +- cpp/src/arrow/dataset/file_base.h | 4 ++-- 4 files changed, 15 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 29ce9794cb3..1fad8a5a2b9 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -65,7 +65,7 @@ Status Dataset::NewScan(std::unique_ptr* out) { bool DataSource::AssumePartitionExpression(std::shared_ptr* options) const { DCHECK_NE(options, nullptr); if (*options == nullptr) { - // null scan context; no selector to simplify + // null scan options; no selector to simplify return true; } @@ -84,11 +84,15 @@ bool DataSource::AssumePartitionExpression(std::shared_ptr* options return true; } -DataFragmentIterator SimpleDataSource::GetFragments( - std::shared_ptr options) { +DataFragmentIterator DataSource::GetFragments(std::shared_ptr options) { if (!AssumePartitionExpression(&options)) { return MakeEmptyIterator>(); } + return GetFragmentsImpl(std::move(options)); +} + +DataFragmentIterator SimpleDataSource::GetFragmentsImpl( + std::shared_ptr options) { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 75d1a6eff3a..e28d175ca81 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -77,7 +77,7 @@ class ARROW_DS_EXPORT DataSource { public: /// \brief GetFragments returns an iterator of DataFragments. The ScanOptions /// controls filtering and schema inference. - virtual DataFragmentIterator GetFragments(std::shared_ptr options) = 0; + DataFragmentIterator GetFragments(std::shared_ptr options); /// \brief An expression which evaluates to true for all data viewed by this DataSource. /// May be null, which indicates no information is available. @@ -94,9 +94,11 @@ class ARROW_DS_EXPORT DataSource { explicit DataSource(std::shared_ptr c) : partition_expression_(std::move(c)) {} + virtual DataFragmentIterator GetFragmentsImpl(std::shared_ptr options) = 0; + /// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded /// fragments. Returns false if the selector is not satisfiable in this DataSource. - bool AssumePartitionExpression(std::shared_ptr* options) const; + virtual bool AssumePartitionExpression(std::shared_ptr* options) const; std::shared_ptr partition_expression_; }; @@ -107,7 +109,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { explicit SimpleDataSource(DataFragmentVector fragments) : fragments_(std::move(fragments)) {} - DataFragmentIterator GetFragments(std::shared_ptr options) override; + DataFragmentIterator GetFragmentsImpl(std::shared_ptr options) override; std::string type() const override { return "simple_data_source"; } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 504c5da11b1..1ffd0252900 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -85,7 +85,7 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, return Make(filesystem, selector, std::move(format), nullptr, out); } -DataFragmentIterator FileSystemBasedDataSource::GetFragments( +DataFragmentIterator FileSystemBasedDataSource::GetFragmentsImpl( std::shared_ptr options) { if (!AssumePartitionExpression(&options)) { return MakeEmptyIterator>(); diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 34b6d05808a..440b36f75d9 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -179,9 +179,9 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { std::string type() const override { return "directory"; } - DataFragmentIterator GetFragments(std::shared_ptr options) override; - protected: + DataFragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, std::shared_ptr partition_expression, From e8c8cd68a5dcf3a7c8846b4a6bdf354850b18c9c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Thu, 5 Sep 2019 14:43:08 -0400 Subject: [PATCH 09/13] AssumePartitionExpression's inout argument is confusing --- cpp/src/arrow/dataset/dataset.cc | 25 +++++++++++++++---------- cpp/src/arrow/dataset/dataset.h | 4 +++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 1fad8a5a2b9..e3b81977e2b 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -62,14 +62,17 @@ Status Dataset::NewScan(std::unique_ptr* out) { return Status::OK(); } -bool DataSource::AssumePartitionExpression(std::shared_ptr* options) const { - DCHECK_NE(options, nullptr); - if (*options == nullptr) { +bool DataSource::AssumePartitionExpression( + const std::shared_ptr& scan_options, + std::shared_ptr* simplified_scan_options) const { + DCHECK_NE(simplified_scan_options, nullptr); + if (scan_options == nullptr) { // null scan options; no selector to simplify + *simplified_scan_options = scan_options; return true; } - auto c = SelectorAssume((*options)->selector, partition_expression_); + auto c = SelectorAssume(scan_options->selector, partition_expression_); DCHECK_OK(c.status()); auto expr = std::move(c).ValueOrDie(); @@ -79,20 +82,22 @@ bool DataSource::AssumePartitionExpression(std::shared_ptr* options return false; } - *options = std::make_shared(**options); - (*options)->selector = ExpressionSelector(std::move(expr)); + auto copy = std::make_shared(*scan_options); + copy->selector = ExpressionSelector(std::move(expr)); + *simplified_scan_options = std::move(copy); return true; } -DataFragmentIterator DataSource::GetFragments(std::shared_ptr options) { - if (!AssumePartitionExpression(&options)) { +DataFragmentIterator DataSource::GetFragments(std::shared_ptr scan_options) { + std::shared_ptr simplified_scan_options; + if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) { return MakeEmptyIterator>(); } - return GetFragmentsImpl(std::move(options)); + return GetFragmentsImpl(std::move(simplified_scan_options)); } DataFragmentIterator SimpleDataSource::GetFragmentsImpl( - std::shared_ptr options) { + std::shared_ptr scan_options) { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index e28d175ca81..b6f7ca7106a 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -98,7 +98,9 @@ class ARROW_DS_EXPORT DataSource { /// Mutates a ScanOptions by assuming partition_expression_ holds for all yielded /// fragments. Returns false if the selector is not satisfiable in this DataSource. - virtual bool AssumePartitionExpression(std::shared_ptr* options) const; + virtual bool AssumePartitionExpression( + const std::shared_ptr& scan_options, + std::shared_ptr* simplified_scan_options) const; std::shared_ptr partition_expression_; }; From a9e5d7aa6138eec5b0f6e88bb433a1591c3157ea Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 10 Sep 2019 14:58:13 -0400 Subject: [PATCH 10/13] bludgeon MSVC linker error with __forceinline --- cpp/src/arrow/util/iterator.h | 1 + cpp/src/arrow/util/visibility.h | 2 ++ 2 files changed, 3 insertions(+) diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 38ad382eb70..ec138d96691 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -25,6 +25,7 @@ #include "arrow/status.h" #include "arrow/util/functional.h" #include "arrow/util/macros.h" +#include "arrow/util/visibility.h" namespace arrow { diff --git a/cpp/src/arrow/util/visibility.h b/cpp/src/arrow/util/visibility.h index 95cd9cf5ba2..2b3751ed096 100644 --- a/cpp/src/arrow/util/visibility.h +++ b/cpp/src/arrow/util/visibility.h @@ -34,12 +34,14 @@ #endif #define ARROW_NO_EXPORT +#define ARROW_FORCE_INLINE __forceinline #else // Not Windows #ifndef ARROW_EXPORT #define ARROW_EXPORT __attribute__((visibility("default"))) #endif #ifndef ARROW_NO_EXPORT #define ARROW_NO_EXPORT __attribute__((visibility("hidden"))) +#define ARROW_FORCE_INLINE #endif #endif // Non-Windows From 42e2ad30477279fb45e2af85733829aa588ceda9 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 11 Sep 2019 11:21:22 -0400 Subject: [PATCH 11/13] clang-format --- cpp/src/arrow/dataset/file_base.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 1ffd0252900..a335a2a679d 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -86,8 +86,9 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, } DataFragmentIterator FileSystemBasedDataSource::GetFragmentsImpl( - std::shared_ptr options) { - if (!AssumePartitionExpression(&options)) { + std::shared_ptr scan_options) { + std::shared_ptr simplified_scan_options; + if (!AssumePartitionExpression(scan_options, &simplified_scan_options)) { return MakeEmptyIterator>(); } @@ -119,7 +120,7 @@ DataFragmentIterator FileSystemBasedDataSource::GetFragmentsImpl( std::vector stats_; }; - return DataFragmentIterator(Impl(filesystem_, format_, options, stats_)); + return DataFragmentIterator(Impl(filesystem_, format_, scan_options, stats_)); } } // namespace dataset From 13b5948a5d80c78b10d4e00594fc302d81d44618 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 17 Sep 2019 11:00:19 -0400 Subject: [PATCH 12/13] add comment on motivation for type erasure approach --- cpp/src/arrow/util/iterator.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index ec138d96691..20f865cebf2 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -46,9 +46,15 @@ class Iterator { public: /// \brief Iterator may be constructed from any type which has a member function /// with signature Status Next(T*); + /// /// The argument is moved or copied to the heap and kept in a unique_ptr. Only /// its destructor and its Next method (which are stored in function pointers) are /// referenced after construction. + /// + /// This approach is used to dodge MSVC linkage hell (ARROW-6244, ARROW-6558) when using + /// an abstract template base class: instead of being inlined as usual for a template + /// function the base's virtual destructor will be exported, leading to multiple + /// definition errors when linking to any other TU where the base is instantiated. template explicit Iterator(Wrapped has_next) : ptr_(new Wrapped(std::move(has_next)), Delete), next_(Next) {} From 142cc7bfbaf9fad38e206652aeac7345854c7657 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 17 Sep 2019 22:23:13 -0400 Subject: [PATCH 13/13] explicit move for Result returning functions --- cpp/src/arrow/dataset/filter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/filter.cc b/cpp/src/arrow/dataset/filter.cc index 53bbb116218..7e1ee82b22b 100644 --- a/cpp/src/arrow/dataset/filter.cc +++ b/cpp/src/arrow/dataset/filter.cc @@ -957,7 +957,7 @@ Result> SelectorAssume( } if (given == nullptr) { - return out_expr; + return std::move(out_expr); } return out_expr->Assume(*given); }