From 7c9d4671c2894b7ba45208add894e034c4f93d35 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 27 Aug 2019 17:06:25 -0400 Subject: [PATCH 1/8] fold ScanOptions into ScanContext --- cpp/src/arrow/dataset/dataset.cc | 3 +- cpp/src/arrow/dataset/dataset.h | 18 +++--- cpp/src/arrow/dataset/file_base.cc | 32 +++++------ cpp/src/arrow/dataset/file_base.h | 43 ++++---------- cpp/src/arrow/dataset/file_csv.h | 38 +++++++------ cpp/src/arrow/dataset/file_feather.h | 7 +-- cpp/src/arrow/dataset/file_json.h | 39 +++++++------ cpp/src/arrow/dataset/file_parquet.cc | 23 ++++---- cpp/src/arrow/dataset/file_parquet.h | 34 ++++++----- cpp/src/arrow/dataset/file_parquet_test.cc | 15 ++--- cpp/src/arrow/dataset/partition.h | 8 +-- cpp/src/arrow/dataset/scanner.h | 65 +++++++++++++++------- cpp/src/arrow/dataset/test_util.h | 54 +++++++++--------- cpp/src/arrow/dataset/type_fwd.h | 1 - cpp/src/arrow/dataset/writer.h | 10 ++-- 15 files changed, 202 insertions(+), 188 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 66e25d404d0..fd4d9021f42 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -30,8 +30,7 @@ SimpleDataFragment::SimpleDataFragment( std::vector> record_batches) : record_batches_(std::move(record_batches)) {} -Status SimpleDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { +Status SimpleDataFragment::Scan(std::unique_ptr* out) { // Make an explicit copy of record_batches_ to ensure Scan can be called // multiple times. auto it = MakeVectorIterator(record_batches_); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index a30c977a72c..34675163392 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -38,18 +38,17 @@ class ARROW_DS_EXPORT DataFragment { public: /// \brief Scan returns an iterator of ScanTasks, each of which yields /// RecordBatches from this DataFragment. - virtual Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) = 0; + virtual Status Scan(std::unique_ptr* out) = 0; /// \brief Return true if the fragment can benefit from parallel /// scanning virtual bool splittable() const = 0; - /// \brief Filtering, schema reconciliation, and partition options to use when + /// \brief Filtering, schema reconciliation, and partition keys to use when /// scanning this fragment. May be nullptr, which indicates that no filtering /// or schema reconciliation will be performed and all partitions will be /// scanned. - virtual std::shared_ptr scan_options() const = 0; + virtual std::shared_ptr context() const = 0; virtual ~DataFragment() = default; }; @@ -60,12 +59,11 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { public: explicit SimpleDataFragment(std::vector> record_batches); - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::unique_ptr* out) override; bool splittable() const override { return false; } - std::shared_ptr scan_options() const override { return NULLPTR; } + std::shared_ptr context() const override { return NULLPTR; } protected: std::vector> record_batches_; @@ -76,10 +74,10 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { /// and partitions, e.g. files deeply nested in a directory. class ARROW_DS_EXPORT DataSource { public: - /// \brief GetFragments returns an iterator of DataFragments. The ScanOptions + /// \brief GetFragments returns an iterator of DataFragments. The ScanContext /// controls filtering and schema inference. virtual std::unique_ptr GetFragments( - std::shared_ptr options) = 0; + std::shared_ptr context) = 0; virtual std::string type() const = 0; @@ -93,7 +91,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { : fragments_(std::move(fragments)) {} std::unique_ptr GetFragments( - std::shared_ptr options) override { + std::shared_ptr context) override { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index acc45b8bf9a..68f4fe97a3e 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -40,25 +40,25 @@ Status FileSource::Open(std::shared_ptr* out) const return Status::OK(); } -Status FileBasedDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { - return format_->ScanFile(source_, scan_options_, scan_context, out); +Status FileBasedDataFragment::Scan(std::unique_ptr* out) { + return format_->ScanFile(source_, context_, out); } -FileSystemBasedDataSource::FileSystemBasedDataSource( - fs::FileSystem* filesystem, const fs::Selector& selector, - std::shared_ptr format, std::shared_ptr scan_options, - std::vector stats) +FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem, + const fs::Selector& selector, + std::shared_ptr format, + std::shared_ptr context, + std::vector stats) : filesystem_(filesystem), selector_(std::move(selector)), format_(std::move(format)), - scan_options_(std::move(scan_options)), + context_(std::move(context)), stats_(std::move(stats)) {} Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, + std::shared_ptr context, std::unique_ptr* out) { std::vector stats; RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats)); @@ -71,18 +71,18 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, stats.resize(new_end - stats.begin()); out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format), - std::move(scan_options), std::move(stats))); + std::move(context), std::move(stats))); return Status::OK(); } std::unique_ptr FileSystemBasedDataSource::GetFragments( - std::shared_ptr options) { + std::shared_ptr context) { struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, - std::shared_ptr scan_options, std::vector stats) + std::shared_ptr context, std::vector stats) : filesystem_(filesystem), format_(std::move(format)), - scan_options_(std::move(scan_options)), + context_(std::move(context)), stats_(std::move(stats)) {} Status Next(std::shared_ptr* out) { @@ -93,7 +93,7 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( FileSource src(stats_[i_++].path(), filesystem_); std::unique_ptr fragment; - RETURN_NOT_OK(format_->MakeFragment(src, scan_options_, &fragment)); + RETURN_NOT_OK(format_->MakeFragment(src, context_, &fragment)); *out = std::move(fragment); return Status::OK(); } @@ -101,11 +101,11 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( size_t i_ = 0; fs::FileSystem* filesystem_; std::shared_ptr format_; - std::shared_ptr scan_options_; + std::shared_ptr context_; std::vector stats_; }; - return internal::make_unique(filesystem_, format_, options, stats_); + return internal::make_unique(filesystem_, format_, context, stats_); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 2053342dcf7..06b68eef620 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -101,22 +101,6 @@ class ARROW_DS_EXPORT FileSource { std::shared_ptr buffer_; }; -/// \brief Base class for file scanning options -class ARROW_DS_EXPORT FileScanOptions : public ScanOptions { - public: - /// \brief The name of the file format this options corresponds to - virtual std::string file_type() const = 0; -}; - -/// \brief Base class for file writing options -class ARROW_DS_EXPORT FileWriteOptions : public WriteOptions { - public: - virtual ~FileWriteOptions() = default; - - /// \brief The name of the file format this options corresponds to - virtual std::string file_type() const = 0; -}; - /// \brief Base class for file format implementation class ARROW_DS_EXPORT FileFormat { public: @@ -128,14 +112,12 @@ class ARROW_DS_EXPORT FileFormat { virtual bool IsKnownExtension(const std::string& ext) const = 0; /// \brief Open a file for scanning - virtual Status ScanFile(const FileSource& source, - std::shared_ptr scan_options, - std::shared_ptr scan_context, + virtual Status ScanFile(const FileSource& source, std::shared_ptr context, std::unique_ptr* out) const = 0; /// \brief Open a fragment virtual Status MakeFragment(const FileSource& location, - std::shared_ptr opts, + std::shared_ptr context, std::unique_ptr* out) = 0; }; @@ -143,23 +125,20 @@ class ARROW_DS_EXPORT FileFormat { class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { public: FileBasedDataFragment(const FileSource& source, std::shared_ptr format, - std::shared_ptr scan_options) - : source_(source), - format_(std::move(format)), - scan_options_(std::move(scan_options)) {} + std::shared_ptr context) + : source_(source), format_(std::move(format)), context_(std::move(context)) {} - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::unique_ptr* out) override; const FileSource& source() const { return source_; } std::shared_ptr format() const { return format_; } - std::shared_ptr scan_options() const override { return scan_options_; } + std::shared_ptr context() const override { return context_; } protected: FileSource source_; std::shared_ptr format_; - std::shared_ptr scan_options_; + std::shared_ptr context_; }; /// \brief A DataSource which takes files of one format from a directory @@ -171,24 +150,24 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { public: static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, + std::shared_ptr context, std::unique_ptr* out); std::string type() const override { return "directory"; } std::unique_ptr GetFragments( - std::shared_ptr options) override; + std::shared_ptr context) override; protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, + std::shared_ptr context, std::vector stats); fs::FileSystem* filesystem_ = NULLPTR; fs::Selector selector_; std::shared_ptr format_; - std::shared_ptr scan_options_; + std::shared_ptr context_; std::vector stats_; }; diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 6b02085a53c..aeb33502e5e 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -36,9 +36,27 @@ class FileSystem; namespace dataset { +/// \brief A FileFormat implementation that reads from CSV files +class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { + public: + std::string name() const override; + + /// \brief Return true if the given file extension + bool IsKnownExtension(const std::string& ext) const override; + + /// \brief Open a file for scanning + Status ScanFile(const FileSource& source, std::shared_ptr context, + std::unique_ptr* out) const override; + + Status MakeFragment(const FileSource& source, std::shared_ptr context, + std::unique_ptr* out) override; +}; + class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions { public: - std::string file_type() const override; + std::shared_ptr file_format() const override { + return std::make_shared(); + } private: csv::ParseOptions parse_options_; @@ -48,21 +66,9 @@ class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions { class ARROW_DS_EXPORT CsvWriteOptions : public FileWriteOptions { public: - std::string file_type() const override; -}; - -/// \brief A FileFormat implementation that reads from CSV files -class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { - public: - std::string name() const override; - - /// \brief Return true if the given file extension - bool IsKnownExtension(const std::string& ext) const override; - - /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, - std::unique_ptr* out) const override; + std::shared_ptr file_format() const override { + return std::make_shared(); + } }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index c91585586ad..52b9c6b3f96 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -29,12 +29,12 @@ namespace dataset { class ARROW_DS_EXPORT FeatherScanOptions : public FileScanOptions { public: - std::string file_type() const override; + std::shared_ptr file_format() const override; }; class ARROW_DS_EXPORT FeatherWriterOptions : public FileWriteOptions { public: - std::string file_type() const override; + std::shared_ptr file_format() const override; }; /// \brief A FileFormat implementation that reads from Feather (Arrow @@ -47,8 +47,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, std::shared_ptr context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index e24cc0d9c45..46d46cd685d 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -28,21 +28,6 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions { - public: - /// - std::string file_type() const override; - - private: - json::ParseOptions parse_options_; - json::ReadOptions read_options_; -}; - -class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions { - public: - std::string file_type() const override; -}; - /// \brief A FileFormat implementation that reads from JSON files class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { public: @@ -52,9 +37,29 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, std::shared_ptr context, std::unique_ptr* out) const override; + + Status MakeFragment(const FileSource& source, std::shared_ptr context, + std::unique_ptr* out) override; +}; + +class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } + + private: + json::ParseOptions parse_options_; + json::ReadOptions read_options_; +}; + +class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index b761ab24814..7bd33d3f4ff 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -109,16 +109,17 @@ class ParquetRowGroupPartitioner { class ParquetScanTaskIterator : public ScanTaskIterator { public: - static Status Make(std::shared_ptr options, - std::shared_ptr context, ParquetFileReaderPtr reader, + static Status Make(std::shared_ptr context, ParquetFileReaderPtr reader, std::unique_ptr* out) { + DCHECK_NE(context, nullptr); + auto metadata = reader->metadata(); std::vector columns_projection; - RETURN_NOT_OK(InferColumnProjection(*metadata, options, &columns_projection)); + RETURN_NOT_OK(InferColumnProjection(*metadata, context, &columns_projection)); std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader), + RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool(), std::move(reader), &arrow_reader)); out->reset(new ParquetScanTaskIterator(columns_projection, metadata, @@ -143,7 +144,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator { private: // Compute the column projection out of an optional arrow::Schema static Status InferColumnProjection(const parquet::FileMetaData& metadata, - const std::shared_ptr& options, + const std::shared_ptr& context, std::vector* out) { // TODO(fsaintjacques): Compute intersection _and_ validity *out = internal::Iota(metadata.num_columns()); @@ -164,22 +165,20 @@ class ParquetScanTaskIterator : public ScanTaskIterator { }; Status ParquetFileFormat::ScanFile(const FileSource& source, - std::shared_ptr scan_options, - std::shared_ptr scan_context, + std::shared_ptr context, std::unique_ptr* out) const { std::shared_ptr input; RETURN_NOT_OK(source.Open(&input)); auto reader = parquet::ParquetFileReader::Open(input); - return ParquetScanTaskIterator::Make(scan_options, scan_context, std::move(reader), - out); + return ParquetScanTaskIterator::Make(std::move(context), std::move(reader), out); } Status ParquetFileFormat::MakeFragment(const FileSource& source, - std::shared_ptr opts, + std::shared_ptr context, std::unique_ptr* out) { - // TODO(bkietz) check location.path() against IsKnownExtension etc - *out = internal::make_unique(source, opts); + // TODO(bkietz) check source.path() against IsKnownExtension etc + *out = internal::make_unique(source, std::move(context)); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index fd462588317..828497ee873 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -27,16 +27,6 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT ParquetScanOptions : public FileScanOptions { - public: - std::string file_type() const override { return "parquet"; } -}; - -class ARROW_DS_EXPORT ParquetWriteOptions : public FileWriteOptions { - public: - std::string file_type() const override { return "parquet"; } -}; - /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: @@ -48,21 +38,35 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { } /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, std::shared_ptr context, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, std::shared_ptr opts, + Status MakeFragment(const FileSource& source, std::shared_ptr context, std::unique_ptr* out) override; }; class ARROW_DS_EXPORT ParquetFragment : public FileBasedDataFragment { public: - ParquetFragment(const FileSource& source, std::shared_ptr options) - : FileBasedDataFragment(source, std::make_shared(), options) {} + ParquetFragment(const FileSource& source, std::shared_ptr context) + : FileBasedDataFragment(source, std::make_shared(), context) {} bool splittable() const override { return true; } }; +class ARROW_DS_EXPORT ParquetScanOptions : public FileScanOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } + // TODO(bkietz) probably this should wrap an ArrowReaderProperties +}; + +class ARROW_DS_EXPORT ParquetWriteOptions : public FileWriteOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 058a49d6796..0aeb85d3a41 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -142,22 +142,17 @@ class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin { } }; -class TestParquetFileFormat : public ParquetBufferFixtureMixin { - public: - TestParquetFileFormat() : ctx_(std::make_shared()) {} - - protected: - std::shared_ptr opts_; - std::shared_ptr ctx_; -}; +class TestParquetFileFormat : public ParquetBufferFixtureMixin {}; TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto fragment = std::make_shared(*source, opts_); + auto context = std::make_shared(); + context->schema(reader->schema()); + auto fragment = std::make_shared(*source, context); std::unique_ptr it; - ASSERT_OK(fragment->Scan(ctx_, &it)); + ASSERT_OK(fragment->Scan(&it)); int64_t row_count = 0; ASSERT_OK(it->Visit([&row_count](std::unique_ptr task) -> Status { diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 28c55adcc10..01a33fd93af 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -161,17 +161,17 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { public: SimplePartition(std::unique_ptr partition_key, DataFragmentVector&& data_fragments, PartitionVector&& subpartitions, - std::shared_ptr scan_options = NULLPTR) + std::shared_ptr context = NULLPTR) : key_(std::move(partition_key)), data_fragments_(std::move(data_fragments)), subpartitions_(std::move(subpartitions)), - scan_options_(scan_options) {} + context_(context) {} const PartitionKey* key() const override { return key_.get(); } int num_subpartitions() const { return static_cast(subpartitions_.size()); } - int num_data_fragments() const { return static_cast(data_fragments__.size()); } + int num_data_fragments() const { return static_cast(data_fragments_.size()); } const PartitionVector& subpartitions() const { return subpartitions_; } const DataFragmentVector& data_fragments() const { return data_fragments_; } @@ -192,7 +192,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { std::vector> subpartitions_; /// \brief Default scan options to use for data fragments - std::shared_ptr scan_options_; + std::shared_ptr context_; }; /// \brief A PartitionSource that returns fragments as the result of input iterators diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 9177a5418d7..4110539f761 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -29,31 +29,62 @@ namespace arrow { namespace dataset { -/// \brief Shared state for a Scan operation -struct ARROW_DS_EXPORT ScanContext { - MemoryPool* pool = arrow::default_memory_pool(); -}; +/// \brief Base class for file scanning options +class ARROW_DS_EXPORT FileScanOptions { + public: + /// \brief The file format this options corresponds to + virtual std::shared_ptr file_format() const = 0; -// TODO(wesm): API for handling of post-materialization filters. For -// example, if the user requests [$col1 > 0, $col2 > 0] and $col1 is a -// partition key, but $col2 is not, then the filter "$col2 > 0" must -// be evaluated in-memory against the RecordBatch objects resulting -// from the Scan + virtual ~FileScanOptions() = default; +}; -class ARROW_DS_EXPORT ScanOptions { +struct ARROW_DS_EXPORT ScanContext final { public: - virtual ~ScanOptions() = default; + ScanContext() = default; + /// Filters const std::shared_ptr& selector() const { return selector_; } + ScanContext& selector(std::shared_ptr s) { + selector_ = std::move(s); + return *this; + } + + /// Schema to which record batches will be projected const std::shared_ptr& schema() const { return schema_; } + ScanContext& schema(std::shared_ptr s) { + schema_ = std::move(s); + return *this; + } + + /// MemoryPool used for allocating temporary memory and yielded record batches + MemoryPool* pool() const { return pool_; } + + ScanContext& pool(MemoryPool* p) { + pool_ = p; + return *this; + } + + /// format specific options + const std::vector>& options() const { + return options_; + } + + ScanContext& AddFileScanOptions(std::shared_ptr opts) { + options_.push_back(std::move(opts)); + return *this; + } + protected: - // Filters std::shared_ptr selector_; // Schema to which record batches will be reconciled std::shared_ptr schema_; + + MemoryPool* pool_ = default_memory_pool(); + + std::vector> options_; }; /// \brief Read record batches from a range of a single data fragment. A @@ -116,24 +147,19 @@ class ARROW_DS_EXPORT Scanner { class ARROW_DS_EXPORT SimpleScanner : public Scanner { public: SimpleScanner(std::vector> sources, - std::shared_ptr options, std::shared_ptr context) - : sources_(std::move(sources)), - options_(std::move(options)), - context_(std::move(context)) {} + : sources_(std::move(sources)), context_(std::move(context)) {} std::unique_ptr Scan() override; private: std::vector> sources_; - std::shared_ptr options_; std::shared_ptr context_; }; class ARROW_DS_EXPORT ScannerBuilder { public: - ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_context); + ScannerBuilder(std::shared_ptr dataset); /// \brief Set ScannerBuilder* Project(const std::vector& columns); @@ -152,7 +178,6 @@ class ARROW_DS_EXPORT ScannerBuilder { private: std::shared_ptr dataset_; - std::shared_ptr scan_context_; std::vector project_columns_; FilterVector filters_; bool include_partition_keys_; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index c4cbaef0dd7..637eaa692cd 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -90,7 +90,7 @@ class DatasetFixtureMixin : public ::testing::Test { void AssertFragmentEquals(RecordBatchReader* expected, DataFragment* fragment, bool ensure_drained = true) { std::unique_ptr it; - ARROW_EXPECT_OK(fragment->Scan(ctx_, &it)); + ARROW_EXPECT_OK(fragment->Scan(&it)); ARROW_EXPECT_OK(it->Visit([&](std::unique_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); @@ -106,7 +106,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragments of a source. void AssertDataSourceEquals(RecordBatchReader* expected, DataSource* source, bool ensure_drained = true) { - auto it = source->GetFragments(options_); + auto it = source->GetFragments(ctx_); ARROW_EXPECT_OK(it->Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); @@ -152,7 +152,6 @@ class DatasetFixtureMixin : public ::testing::Test { } protected: - std::shared_ptr options_ = nullptr; std::shared_ptr ctx_; }; @@ -163,6 +162,9 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { void SetUp() override { format_ = std::make_shared(); + schema_ = schema({field("dummy", null())}); + context_ = std::make_shared(); + context_->schema(schema_); ASSERT_OK( TemporaryDir::Make("test-fsdatasource-" + format_->name() + "-", &temp_dir_)); @@ -187,8 +189,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { } void MakeDataSource() { - ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, - std::make_shared(), &source_)); + ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, context_, + &source_)); } protected: @@ -197,8 +199,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + ASSERT_OK(source_->GetFragments(context_)->Visit( + [&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -218,8 +220,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + ASSERT_OK(source_->GetFragments(context_)->Visit( + [&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -242,15 +244,16 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { ASSERT_RAISES( IOError, - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + source_->GetFragments(context_)->Visit( + [&](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + auto extension = + fs::internal::GetAbstractPathExtension(file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + })); } fs::Selector selector_; @@ -259,6 +262,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { std::shared_ptr fs_; std::unique_ptr temp_dir_; std::shared_ptr format_; + std::shared_ptr context_; + std::shared_ptr schema_; }; template @@ -276,30 +281,29 @@ class DummyFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override { return ext == name(); } /// \brief Open a file for scanning (always returns an empty iterator) - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, std::shared_ptr context, std::unique_ptr* out) const override { *out = internal::make_unique>>(); return Status::OK(); } inline Status MakeFragment(const FileSource& location, - std::shared_ptr opts, + std::shared_ptr context, std::unique_ptr* out) override; }; class DummyFragment : public FileBasedDataFragment { public: - DummyFragment(const FileSource& source, std::shared_ptr options) - : FileBasedDataFragment(source, std::make_shared(), options) {} + DummyFragment(const FileSource& source, std::shared_ptr context) + : FileBasedDataFragment(source, std::make_shared(), context) {} bool splittable() const override { return false; } }; Status DummyFileFormat::MakeFragment(const FileSource& source, - std::shared_ptr opts, + std::shared_ptr context, std::unique_ptr* out) { - *out = internal::make_unique(source, opts); + *out = internal::make_unique(source, context); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 8e3824625ed..d41f61ba187 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -57,7 +57,6 @@ using PartitionVector = std::vector>; using PartitionIterator = Iterator>; struct ScanContext; -class ScanOptions; class Scanner; class ScannerBuilder; class ScanTask; diff --git a/cpp/src/arrow/dataset/writer.h b/cpp/src/arrow/dataset/writer.h index 048a0e54d75..741f63691b6 100644 --- a/cpp/src/arrow/dataset/writer.h +++ b/cpp/src/arrow/dataset/writer.h @@ -18,8 +18,6 @@ #pragma once #include -#include -#include #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" @@ -27,9 +25,13 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT WriteOptions { +/// \brief Base class for file writing options +class ARROW_DS_EXPORT FileWriteOptions { public: - virtual ~WriteOptions() = default; + /// \brief The file format this options corresponds to + virtual std::shared_ptr file_format() const = 0; + + virtual ~FileWriteOptions() = default; }; } // namespace dataset From 5ad3e0bca8c531f51e7d9b825fa1f404cdc88014 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 30 Aug 2019 13:11:40 -0400 Subject: [PATCH 2/8] remove ScanContext from shared_ptr --- cpp/src/arrow/dataset/dataset.h | 10 ++++++---- cpp/src/arrow/dataset/file_base.cc | 13 +++++-------- cpp/src/arrow/dataset/file_base.h | 17 ++++++----------- cpp/src/arrow/dataset/file_csv.h | 4 ++-- cpp/src/arrow/dataset/file_feather.h | 2 +- cpp/src/arrow/dataset/file_json.h | 4 ++-- cpp/src/arrow/dataset/file_parquet.cc | 14 +++++--------- cpp/src/arrow/dataset/file_parquet.h | 6 +++--- cpp/src/arrow/dataset/file_parquet_test.cc | 4 +--- cpp/src/arrow/dataset/partition.h | 4 ++-- cpp/src/arrow/dataset/test_util.h | 22 ++++++++-------------- 11 files changed, 41 insertions(+), 59 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 34675163392..47841769d26 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -22,6 +22,7 @@ #include #include +#include "arrow/dataset/scanner.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/iterator.h" @@ -48,7 +49,7 @@ class ARROW_DS_EXPORT DataFragment { /// scanning this fragment. May be nullptr, which indicates that no filtering /// or schema reconciliation will be performed and all partitions will be /// scanned. - virtual std::shared_ptr context() const = 0; + virtual const ScanContext& context() const = 0; virtual ~DataFragment() = default; }; @@ -63,10 +64,11 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { bool splittable() const override { return false; } - std::shared_ptr context() const override { return NULLPTR; } + const ScanContext& context() const override { return context_; } protected: std::vector> record_batches_; + ScanContext context_; }; /// \brief A basic component of a Dataset which yields zero or more @@ -77,7 +79,7 @@ class ARROW_DS_EXPORT DataSource { /// \brief GetFragments returns an iterator of DataFragments. The ScanContext /// controls filtering and schema inference. virtual std::unique_ptr GetFragments( - std::shared_ptr context) = 0; + const ScanContext& context) = 0; virtual std::string type() const = 0; @@ -91,7 +93,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { : fragments_(std::move(fragments)) {} std::unique_ptr GetFragments( - std::shared_ptr context) override { + const ScanContext& context) override { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 68f4fe97a3e..2d16fc2f9c1 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -47,18 +47,15 @@ Status FileBasedDataFragment::Scan(std::unique_ptr* out) { FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr context, std::vector stats) : filesystem_(filesystem), selector_(std::move(selector)), format_(std::move(format)), - context_(std::move(context)), stats_(std::move(stats)) {} Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr context, std::unique_ptr* out) { std::vector stats; RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats)); @@ -71,18 +68,18 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, stats.resize(new_end - stats.begin()); out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format), - std::move(context), std::move(stats))); + std::move(stats))); return Status::OK(); } std::unique_ptr FileSystemBasedDataSource::GetFragments( - std::shared_ptr context) { + const ScanContext& context) { struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, - std::shared_ptr context, std::vector stats) + const ScanContext& context, std::vector stats) : filesystem_(filesystem), format_(std::move(format)), - context_(std::move(context)), + context_(context), stats_(std::move(stats)) {} Status Next(std::shared_ptr* out) { @@ -101,7 +98,7 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( size_t i_ = 0; fs::FileSystem* filesystem_; std::shared_ptr format_; - std::shared_ptr context_; + ScanContext context_; std::vector stats_; }; diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 06b68eef620..0b5ee3ba8ed 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -112,12 +112,11 @@ class ARROW_DS_EXPORT FileFormat { virtual bool IsKnownExtension(const std::string& ext) const = 0; /// \brief Open a file for scanning - virtual Status ScanFile(const FileSource& source, std::shared_ptr context, + virtual Status ScanFile(const FileSource& source, const ScanContext& context, std::unique_ptr* out) const = 0; /// \brief Open a fragment - virtual Status MakeFragment(const FileSource& location, - std::shared_ptr context, + virtual Status MakeFragment(const FileSource& location, const ScanContext& context, std::unique_ptr* out) = 0; }; @@ -125,7 +124,7 @@ class ARROW_DS_EXPORT FileFormat { class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { public: FileBasedDataFragment(const FileSource& source, std::shared_ptr format, - std::shared_ptr context) + const ScanContext& context) : source_(source), format_(std::move(format)), context_(std::move(context)) {} Status Scan(std::unique_ptr* out) override; @@ -133,12 +132,12 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { const FileSource& source() const { return source_; } std::shared_ptr format() const { return format_; } - std::shared_ptr context() const override { return context_; } + const ScanContext& context() const override { return context_; } protected: FileSource source_; std::shared_ptr format_; - std::shared_ptr context_; + ScanContext context_; }; /// \brief A DataSource which takes files of one format from a directory @@ -150,24 +149,20 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { public: static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr context, std::unique_ptr* out); std::string type() const override { return "directory"; } - std::unique_ptr GetFragments( - std::shared_ptr context) override; + std::unique_ptr GetFragments(const ScanContext& context) override; protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr context, std::vector stats); fs::FileSystem* filesystem_ = NULLPTR; fs::Selector selector_; std::shared_ptr format_; - std::shared_ptr context_; std::vector stats_; }; diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index aeb33502e5e..352f256b8b0 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -45,10 +45,10 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr context, + Status ScanFile(const FileSource& source, const ScanContext& context, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, std::shared_ptr context, + Status MakeFragment(const FileSource& source, const ScanContext& context, std::unique_ptr* out) override; }; diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index 52b9c6b3f96..f1955479010 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -47,7 +47,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr context, + Status ScanFile(const FileSource& source, const ScanContext& context, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index 46d46cd685d..9e7899ef111 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -37,10 +37,10 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr context, + Status ScanFile(const FileSource& source, const ScanContext& context, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, std::shared_ptr context, + Status MakeFragment(const FileSource& source, const ScanContext& context, std::unique_ptr* out) override; }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 7bd33d3f4ff..290e3127fcb 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -109,17 +109,15 @@ class ParquetRowGroupPartitioner { class ParquetScanTaskIterator : public ScanTaskIterator { public: - static Status Make(std::shared_ptr context, ParquetFileReaderPtr reader, + static Status Make(const ScanContext& context, ParquetFileReaderPtr reader, std::unique_ptr* out) { - DCHECK_NE(context, nullptr); - auto metadata = reader->metadata(); std::vector columns_projection; RETURN_NOT_OK(InferColumnProjection(*metadata, context, &columns_projection)); std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool(), std::move(reader), + RETURN_NOT_OK(parquet::arrow::FileReader::Make(context.pool(), std::move(reader), &arrow_reader)); out->reset(new ParquetScanTaskIterator(columns_projection, metadata, @@ -144,8 +142,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator { private: // Compute the column projection out of an optional arrow::Schema static Status InferColumnProjection(const parquet::FileMetaData& metadata, - const std::shared_ptr& context, - std::vector* out) { + const ScanContext& context, std::vector* out) { // TODO(fsaintjacques): Compute intersection _and_ validity *out = internal::Iota(metadata.num_columns()); @@ -164,8 +161,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator { std::shared_ptr reader_; }; -Status ParquetFileFormat::ScanFile(const FileSource& source, - std::shared_ptr context, +Status ParquetFileFormat::ScanFile(const FileSource& source, const ScanContext& context, std::unique_ptr* out) const { std::shared_ptr input; RETURN_NOT_OK(source.Open(&input)); @@ -175,7 +171,7 @@ Status ParquetFileFormat::ScanFile(const FileSource& source, } Status ParquetFileFormat::MakeFragment(const FileSource& source, - std::shared_ptr context, + const ScanContext& context, std::unique_ptr* out) { // TODO(bkietz) check source.path() against IsKnownExtension etc *out = internal::make_unique(source, std::move(context)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 828497ee873..3e05f79822a 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -38,16 +38,16 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { } /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr context, + Status ScanFile(const FileSource& source, const ScanContext& context, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, std::shared_ptr context, + Status MakeFragment(const FileSource& source, const ScanContext& context, std::unique_ptr* out) override; }; class ARROW_DS_EXPORT ParquetFragment : public FileBasedDataFragment { public: - ParquetFragment(const FileSource& source, std::shared_ptr context) + ParquetFragment(const FileSource& source, const ScanContext& context) : FileBasedDataFragment(source, std::make_shared(), context) {} bool splittable() const override { return true; } diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 0aeb85d3a41..4f03d1aa411 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -147,9 +147,7 @@ class TestParquetFileFormat : public ParquetBufferFixtureMixin {}; TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto context = std::make_shared(); - context->schema(reader->schema()); - auto fragment = std::make_shared(*source, context); + auto fragment = std::make_shared(*source, ScanContext()); std::unique_ptr it; ASSERT_OK(fragment->Scan(&it)); diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 01a33fd93af..88a7e107b19 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -161,7 +161,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { public: SimplePartition(std::unique_ptr partition_key, DataFragmentVector&& data_fragments, PartitionVector&& subpartitions, - std::shared_ptr context = NULLPTR) + const ScanContext& context = NULLPTR) : key_(std::move(partition_key)), data_fragments_(std::move(data_fragments)), subpartitions_(std::move(subpartitions)), @@ -192,7 +192,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { std::vector> subpartitions_; /// \brief Default scan options to use for data fragments - std::shared_ptr context_; + ScanContext context_; }; /// \brief A PartitionSource that returns fragments as the result of input iterators diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 637eaa692cd..a79a3097401 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -65,8 +65,6 @@ void EnsureRecordBatchReaderDrained(RecordBatchReader* reader) { class DatasetFixtureMixin : public ::testing::Test { public: - DatasetFixtureMixin() : ctx_(std::make_shared()) {} - /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by the data fragment. void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task, @@ -152,7 +150,7 @@ class DatasetFixtureMixin : public ::testing::Test { } protected: - std::shared_ptr ctx_; + ScanContext ctx_; }; template @@ -163,8 +161,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { void SetUp() override { format_ = std::make_shared(); schema_ = schema({field("dummy", null())}); - context_ = std::make_shared(); - context_->schema(schema_); + context_.schema(schema_); ASSERT_OK( TemporaryDir::Make("test-fsdatasource-" + format_->name() + "-", &temp_dir_)); @@ -189,8 +186,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { } void MakeDataSource() { - ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, context_, - &source_)); + ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, &source_)); } protected: @@ -262,7 +258,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { std::shared_ptr fs_; std::unique_ptr temp_dir_; std::shared_ptr format_; - std::shared_ptr context_; + ScanContext context_; std::shared_ptr schema_; }; @@ -281,27 +277,25 @@ class DummyFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override { return ext == name(); } /// \brief Open a file for scanning (always returns an empty iterator) - Status ScanFile(const FileSource& source, std::shared_ptr context, + Status ScanFile(const FileSource& source, const ScanContext& context, std::unique_ptr* out) const override { *out = internal::make_unique>>(); return Status::OK(); } - inline Status MakeFragment(const FileSource& location, - std::shared_ptr context, + inline Status MakeFragment(const FileSource& location, const ScanContext& context, std::unique_ptr* out) override; }; class DummyFragment : public FileBasedDataFragment { public: - DummyFragment(const FileSource& source, std::shared_ptr context) + DummyFragment(const FileSource& source, const ScanContext& context) : FileBasedDataFragment(source, std::make_shared(), context) {} bool splittable() const override { return false; } }; -Status DummyFileFormat::MakeFragment(const FileSource& source, - std::shared_ptr context, +Status DummyFileFormat::MakeFragment(const FileSource& source, const ScanContext& context, std::unique_ptr* out) { *out = internal::make_unique(source, context); return Status::OK(); From a7f3b54d53802f09a44fda0f1c80eb84313ea89c Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Fri, 30 Aug 2019 13:20:11 -0400 Subject: [PATCH 3/8] lint fix --- cpp/src/arrow/dataset/scanner.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 4110539f761..baba39e54e9 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -159,7 +159,7 @@ class ARROW_DS_EXPORT SimpleScanner : public Scanner { class ARROW_DS_EXPORT ScannerBuilder { public: - ScannerBuilder(std::shared_ptr dataset); + explicit ScannerBuilder(std::shared_ptr dataset); /// \brief Set ScannerBuilder* Project(const std::vector& columns); From 25a2b90395069c6547708639627967ac30e3b81a Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 3 Sep 2019 12:02:55 -0400 Subject: [PATCH 4/8] rename ScanContext to ScanOptions --- cpp/src/arrow/dataset/dataset.h | 12 ++-- cpp/src/arrow/dataset/file_base.cc | 14 ++--- cpp/src/arrow/dataset/file_base.h | 14 ++--- cpp/src/arrow/dataset/file_csv.h | 4 +- cpp/src/arrow/dataset/file_feather.h | 2 +- cpp/src/arrow/dataset/file_json.h | 4 +- cpp/src/arrow/dataset/file_parquet.cc | 16 ++--- cpp/src/arrow/dataset/file_parquet.h | 8 +-- cpp/src/arrow/dataset/file_parquet_test.cc | 2 +- cpp/src/arrow/dataset/partition.h | 6 +- cpp/src/arrow/dataset/scanner.h | 12 ++-- cpp/src/arrow/dataset/test_util.h | 70 +++++++++++----------- cpp/src/arrow/dataset/type_fwd.h | 2 +- 13 files changed, 84 insertions(+), 82 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 47841769d26..8e75c7f1240 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -49,7 +49,7 @@ class ARROW_DS_EXPORT DataFragment { /// scanning this fragment. May be nullptr, which indicates that no filtering /// or schema reconciliation will be performed and all partitions will be /// scanned. - virtual const ScanContext& context() const = 0; + virtual const ScanOptions& scan_options() const = 0; virtual ~DataFragment() = default; }; @@ -64,11 +64,11 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { bool splittable() const override { return false; } - const ScanContext& context() const override { return context_; } + const ScanOptions& scan_options() const override { return scan_options_; } protected: std::vector> record_batches_; - ScanContext context_; + ScanOptions scan_options_; }; /// \brief A basic component of a Dataset which yields zero or more @@ -76,10 +76,10 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { /// and partitions, e.g. files deeply nested in a directory. class ARROW_DS_EXPORT DataSource { public: - /// \brief GetFragments returns an iterator of DataFragments. The ScanContext + /// \brief GetFragments returns an iterator of DataFragments. The ScanOptions /// controls filtering and schema inference. virtual std::unique_ptr GetFragments( - const ScanContext& context) = 0; + const ScanOptions& scan_options) = 0; virtual std::string type() const = 0; @@ -93,7 +93,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { : fragments_(std::move(fragments)) {} std::unique_ptr GetFragments( - const ScanContext& context) override { + const ScanOptions& scan_options) override { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 2d16fc2f9c1..cac2ef842d8 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -41,7 +41,7 @@ Status FileSource::Open(std::shared_ptr* out) const } Status FileBasedDataFragment::Scan(std::unique_ptr* out) { - return format_->ScanFile(source_, context_, out); + return format_->ScanFile(source_, scan_options_, out); } FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem, @@ -73,13 +73,13 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, } std::unique_ptr FileSystemBasedDataSource::GetFragments( - const ScanContext& context) { + const ScanOptions& scan_options) { struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, - const ScanContext& context, std::vector stats) + const ScanOptions& scan_options, std::vector stats) : filesystem_(filesystem), format_(std::move(format)), - context_(context), + scan_options_(scan_options), stats_(std::move(stats)) {} Status Next(std::shared_ptr* out) { @@ -90,7 +90,7 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( FileSource src(stats_[i_++].path(), filesystem_); std::unique_ptr fragment; - RETURN_NOT_OK(format_->MakeFragment(src, context_, &fragment)); + RETURN_NOT_OK(format_->MakeFragment(src, scan_options_, &fragment)); *out = std::move(fragment); return Status::OK(); } @@ -98,11 +98,11 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( size_t i_ = 0; fs::FileSystem* filesystem_; std::shared_ptr format_; - ScanContext context_; + ScanOptions scan_options_; std::vector stats_; }; - return internal::make_unique(filesystem_, format_, context, stats_); + return internal::make_unique(filesystem_, format_, scan_options, stats_); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 0b5ee3ba8ed..f3d5b34bfef 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -112,11 +112,11 @@ class ARROW_DS_EXPORT FileFormat { virtual bool IsKnownExtension(const std::string& ext) const = 0; /// \brief Open a file for scanning - virtual Status ScanFile(const FileSource& source, const ScanContext& context, + virtual Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const = 0; /// \brief Open a fragment - virtual Status MakeFragment(const FileSource& location, const ScanContext& context, + virtual Status MakeFragment(const FileSource& location, const ScanOptions& scan_options, std::unique_ptr* out) = 0; }; @@ -124,20 +124,20 @@ class ARROW_DS_EXPORT FileFormat { class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { public: FileBasedDataFragment(const FileSource& source, std::shared_ptr format, - const ScanContext& context) - : source_(source), format_(std::move(format)), context_(std::move(context)) {} + const ScanOptions& scan_options) + : source_(source), format_(std::move(format)), scan_options_(std::move(scan_options)) {} Status Scan(std::unique_ptr* out) override; const FileSource& source() const { return source_; } std::shared_ptr format() const { return format_; } - const ScanContext& context() const override { return context_; } + const ScanOptions& scan_options() const override { return scan_options_; } protected: FileSource source_; std::shared_ptr format_; - ScanContext context_; + ScanOptions scan_options_; }; /// \brief A DataSource which takes files of one format from a directory @@ -153,7 +153,7 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { std::string type() const override { return "directory"; } - std::unique_ptr GetFragments(const ScanContext& context) override; + std::unique_ptr GetFragments(const ScanOptions& scan_options) override; protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 352f256b8b0..c43a94f3448 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -45,10 +45,10 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, const ScanContext& context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, const ScanContext& context, + Status MakeFragment(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) override; }; diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index f1955479010..a52d3f1a704 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -47,7 +47,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, const ScanContext& context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index 9e7899ef111..2499e7f1b83 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -37,10 +37,10 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, const ScanContext& context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, const ScanContext& context, + Status MakeFragment(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) override; }; diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 290e3127fcb..52853e33939 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -109,15 +109,15 @@ class ParquetRowGroupPartitioner { class ParquetScanTaskIterator : public ScanTaskIterator { public: - static Status Make(const ScanContext& context, ParquetFileReaderPtr reader, + static Status Make(const ScanOptions& scan_options, ParquetFileReaderPtr reader, std::unique_ptr* out) { auto metadata = reader->metadata(); std::vector columns_projection; - RETURN_NOT_OK(InferColumnProjection(*metadata, context, &columns_projection)); + RETURN_NOT_OK(InferColumnProjection(*metadata, scan_options, &columns_projection)); std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(context.pool(), std::move(reader), + RETURN_NOT_OK(parquet::arrow::FileReader::Make(scan_options.pool(), std::move(reader), &arrow_reader)); out->reset(new ParquetScanTaskIterator(columns_projection, metadata, @@ -142,7 +142,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator { private: // Compute the column projection out of an optional arrow::Schema static Status InferColumnProjection(const parquet::FileMetaData& metadata, - const ScanContext& context, std::vector* out) { + const ScanOptions& scan_options, std::vector* out) { // TODO(fsaintjacques): Compute intersection _and_ validity *out = internal::Iota(metadata.num_columns()); @@ -161,20 +161,20 @@ class ParquetScanTaskIterator : public ScanTaskIterator { std::shared_ptr reader_; }; -Status ParquetFileFormat::ScanFile(const FileSource& source, const ScanContext& context, +Status ParquetFileFormat::ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const { std::shared_ptr input; RETURN_NOT_OK(source.Open(&input)); auto reader = parquet::ParquetFileReader::Open(input); - return ParquetScanTaskIterator::Make(std::move(context), std::move(reader), out); + return ParquetScanTaskIterator::Make(std::move(scan_options), std::move(reader), out); } Status ParquetFileFormat::MakeFragment(const FileSource& source, - const ScanContext& context, + const ScanOptions& scan_options, std::unique_ptr* out) { // TODO(bkietz) check source.path() against IsKnownExtension etc - *out = internal::make_unique(source, std::move(context)); + *out = internal::make_unique(source, std::move(scan_options)); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 3e05f79822a..8d0542d47d3 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -38,17 +38,17 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { } /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, const ScanContext& context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, const ScanContext& context, + Status MakeFragment(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) override; }; class ARROW_DS_EXPORT ParquetFragment : public FileBasedDataFragment { public: - ParquetFragment(const FileSource& source, const ScanContext& context) - : FileBasedDataFragment(source, std::make_shared(), context) {} + ParquetFragment(const FileSource& source, const ScanOptions& scan_options) + : FileBasedDataFragment(source, std::make_shared(), scan_options) {} bool splittable() const override { return true; } }; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 4f03d1aa411..1ee4f471216 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -147,7 +147,7 @@ class TestParquetFileFormat : public ParquetBufferFixtureMixin {}; TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto fragment = std::make_shared(*source, ScanContext()); + auto fragment = std::make_shared(*source, ScanOptions()); std::unique_ptr it; ASSERT_OK(fragment->Scan(&it)); diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 88a7e107b19..880141adb58 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -161,11 +161,11 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { public: SimplePartition(std::unique_ptr partition_key, DataFragmentVector&& data_fragments, PartitionVector&& subpartitions, - const ScanContext& context = NULLPTR) + const ScanOptions& scan_options = NULLPTR) : key_(std::move(partition_key)), data_fragments_(std::move(data_fragments)), subpartitions_(std::move(subpartitions)), - context_(context) {} + scan_options_(scan_options) {} const PartitionKey* key() const override { return key_.get(); } @@ -192,7 +192,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { std::vector> subpartitions_; /// \brief Default scan options to use for data fragments - ScanContext context_; + ScanOptions scan_options_; }; /// \brief A PartitionSource that returns fragments as the result of input iterators diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index baba39e54e9..31e8e8cf68f 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -38,14 +38,14 @@ class ARROW_DS_EXPORT FileScanOptions { virtual ~FileScanOptions() = default; }; -struct ARROW_DS_EXPORT ScanContext final { +struct ARROW_DS_EXPORT ScanOptions final { public: - ScanContext() = default; + ScanOptions() = default; /// Filters const std::shared_ptr& selector() const { return selector_; } - ScanContext& selector(std::shared_ptr s) { + ScanOptions& selector(std::shared_ptr s) { selector_ = std::move(s); return *this; } @@ -53,7 +53,7 @@ struct ARROW_DS_EXPORT ScanContext final { /// Schema to which record batches will be projected const std::shared_ptr& schema() const { return schema_; } - ScanContext& schema(std::shared_ptr s) { + ScanOptions& schema(std::shared_ptr s) { schema_ = std::move(s); return *this; } @@ -61,7 +61,7 @@ struct ARROW_DS_EXPORT ScanContext final { /// MemoryPool used for allocating temporary memory and yielded record batches MemoryPool* pool() const { return pool_; } - ScanContext& pool(MemoryPool* p) { + ScanOptions& pool(MemoryPool* p) { pool_ = p; return *this; } @@ -71,7 +71,7 @@ struct ARROW_DS_EXPORT ScanContext final { return options_; } - ScanContext& AddFileScanOptions(std::shared_ptr opts) { + ScanOptions& AddFileScanOptions(std::shared_ptr opts) { options_.push_back(std::move(opts)); return *this; } diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index a79a3097401..f37563d6896 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -104,7 +104,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragments of a source. void AssertDataSourceEquals(RecordBatchReader* expected, DataSource* source, bool ensure_drained = true) { - auto it = source->GetFragments(ctx_); + auto it = source->GetFragments(scan_options_); ARROW_EXPECT_OK(it->Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); @@ -150,7 +150,7 @@ class DatasetFixtureMixin : public ::testing::Test { } protected: - ScanContext ctx_; + ScanOptions scan_options_; }; template @@ -161,7 +161,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { void SetUp() override { format_ = std::make_shared(); schema_ = schema({field("dummy", null())}); - context_.schema(schema_); + scan_options_.schema(schema_); ASSERT_OK( TemporaryDir::Make("test-fsdatasource-" + format_->name() + "-", &temp_dir_)); @@ -195,17 +195,17 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK(source_->GetFragments(context_)->Visit( - [&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - ++count; - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + ASSERT_OK(source_->GetFragments(scan_options_) + ->Visit([&](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + ++count; + auto extension = fs::internal::GetAbstractPathExtension( + file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + })); ASSERT_EQ(count, 1); } @@ -216,17 +216,17 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK(source_->GetFragments(context_)->Visit( - [&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - ++count; - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + ASSERT_OK(source_->GetFragments(scan_options_) + ->Visit([&](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + ++count; + auto extension = fs::internal::GetAbstractPathExtension( + file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + })); ASSERT_EQ(count, 4); } @@ -240,8 +240,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { ASSERT_RAISES( IOError, - source_->GetFragments(context_)->Visit( - [&](std::shared_ptr fragment) { + source_->GetFragments(scan_options_) + ->Visit([&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); auto extension = @@ -258,7 +258,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { std::shared_ptr fs_; std::unique_ptr temp_dir_; std::shared_ptr format_; - ScanContext context_; + ScanOptions scan_options_; std::shared_ptr schema_; }; @@ -277,27 +277,29 @@ class DummyFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override { return ext == name(); } /// \brief Open a file for scanning (always returns an empty iterator) - Status ScanFile(const FileSource& source, const ScanContext& context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override { *out = internal::make_unique>>(); return Status::OK(); } - inline Status MakeFragment(const FileSource& location, const ScanContext& context, + inline Status MakeFragment(const FileSource& location, const ScanOptions& scan_options, std::unique_ptr* out) override; }; class DummyFragment : public FileBasedDataFragment { public: - DummyFragment(const FileSource& source, const ScanContext& context) - : FileBasedDataFragment(source, std::make_shared(), context) {} + DummyFragment(const FileSource& source, const ScanOptions& scan_options) + : FileBasedDataFragment(source, std::make_shared(), scan_options) { + } bool splittable() const override { return false; } }; -Status DummyFileFormat::MakeFragment(const FileSource& source, const ScanContext& context, +Status DummyFileFormat::MakeFragment(const FileSource& source, + const ScanOptions& scan_options, std::unique_ptr* out) { - *out = internal::make_unique(source, context); + *out = internal::make_unique(source, scan_options); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index d41f61ba187..0e8665d1943 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -56,7 +56,7 @@ class PartitionScheme; using PartitionVector = std::vector>; using PartitionIterator = Iterator>; -struct ScanContext; +struct ScanOptions; class Scanner; class ScannerBuilder; class ScanTask; From 398a2d0f12ad431815eb94ae0ea4511c43389b00 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 3 Sep 2019 12:05:38 -0400 Subject: [PATCH 5/8] renaming in ScanOptions --- cpp/src/arrow/dataset/scanner.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 31e8e8cf68f..e4e1ff9853a 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -66,13 +66,13 @@ struct ARROW_DS_EXPORT ScanOptions final { return *this; } - /// format specific options - const std::vector>& options() const { - return options_; + /// Format-specific file scanning options + const std::vector>& file_scan_options() const { + return file_scan_options_; } - ScanOptions& AddFileScanOptions(std::shared_ptr opts) { - options_.push_back(std::move(opts)); + ScanOptions& AddFileScanOptions(std::shared_ptr file_scan_options) { + file_scan_options_.push_back(std::move(file_scan_options)); return *this; } @@ -84,7 +84,7 @@ struct ARROW_DS_EXPORT ScanOptions final { MemoryPool* pool_ = default_memory_pool(); - std::vector> options_; + std::vector> file_scan_options_; }; /// \brief Read record batches from a range of a single data fragment. A From 655a54626ef03623254f68dd5db20f9215d9e1dd Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Tue, 3 Sep 2019 12:31:31 -0400 Subject: [PATCH 6/8] move FileScanOptions into ScanOptions --- cpp/src/arrow/dataset/file_csv.h | 2 +- cpp/src/arrow/dataset/file_feather.h | 2 +- cpp/src/arrow/dataset/file_json.h | 2 +- cpp/src/arrow/dataset/file_parquet.h | 2 +- cpp/src/arrow/dataset/scanner.h | 41 ++++++++++++++++++---------- cpp/src/arrow/dataset/type_fwd.h | 1 - 6 files changed, 30 insertions(+), 20 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index c43a94f3448..bbe490c2d7b 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -52,7 +52,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { std::unique_ptr* out) override; }; -class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT CsvScanOptions : public ScanOptions::FileOptions { public: std::shared_ptr file_format() const override { return std::make_shared(); diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index a52d3f1a704..1375ab0948b 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -27,7 +27,7 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT FeatherScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT FeatherScanOptions : public ScanOptions::FileOptions { public: std::shared_ptr file_format() const override; }; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index 2499e7f1b83..23e3dcb3e58 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -44,7 +44,7 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { std::unique_ptr* out) override; }; -class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT JsonScanOptions : public ScanOptions::FileOptions { public: std::shared_ptr file_format() const override { return std::make_shared(); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 8d0542d47d3..402f57125fb 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -53,7 +53,7 @@ class ARROW_DS_EXPORT ParquetFragment : public FileBasedDataFragment { bool splittable() const override { return true; } }; -class ARROW_DS_EXPORT ParquetScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT ParquetScanOptions : public ScanOptions::FileOptions { public: std::shared_ptr file_format() const override { return std::make_shared(); diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index e4e1ff9853a..38f8b60901e 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -29,15 +29,7 @@ namespace arrow { namespace dataset { -/// \brief Base class for file scanning options -class ARROW_DS_EXPORT FileScanOptions { - public: - /// \brief The file format this options corresponds to - virtual std::shared_ptr file_format() const = 0; - - virtual ~FileScanOptions() = default; -}; - +/// Container for scan state struct ARROW_DS_EXPORT ScanOptions final { public: ScanOptions() = default; @@ -66,17 +58,36 @@ struct ARROW_DS_EXPORT ScanOptions final { return *this; } + /// \brief Base class for format specific file scanning options + class ARROW_DS_EXPORT FileOptions { + public: + /// \brief The file format this options corresponds to + virtual std::shared_ptr file_format() const = 0; + + virtual ~FileOptions() = default; + }; + /// Format-specific file scanning options - const std::vector>& file_scan_options() const { - return file_scan_options_; + const std::vector>& file_options() const { + return file_options_; } - ScanOptions& AddFileScanOptions(std::shared_ptr file_scan_options) { - file_scan_options_.push_back(std::move(file_scan_options)); + ScanOptions& AddFileOptions(std::shared_ptr file_options) { + file_options_.push_back(std::move(file_options)); + return *this; + } + + /// Include columns generated from partition keys + bool include_partition_keys() const { return include_partition_keys_; } + + ScanOptions& include_partition_keys(bool include_partition_keys) { + include_partition_keys_ = include_partition_keys; return *this; } protected: + bool include_partition_keys_; + std::shared_ptr selector_; // Schema to which record batches will be reconciled @@ -84,7 +95,7 @@ struct ARROW_DS_EXPORT ScanOptions final { MemoryPool* pool_ = default_memory_pool(); - std::vector> file_scan_options_; + std::vector> file_options_; }; /// \brief Read record batches from a range of a single data fragment. A @@ -166,7 +177,7 @@ class ARROW_DS_EXPORT ScannerBuilder { ScannerBuilder* AddFilter(const std::shared_ptr& filter); - ScannerBuilder* SetGlobalFileOptions(std::shared_ptr options); + ScannerBuilder* SetGlobalFileOptions(std::shared_ptr options); /// \brief If true (default), add partition keys to the /// RecordBatches that the scan produces if they are not in the data diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 0e8665d1943..b3f72c4099d 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -44,7 +44,6 @@ struct DiscoveryOptions; class FileBasedDataFragment; class FileFormat; -class FileScanOptions; class FileWriteOptions; class Filter; From 7b243e7832875e7a3980f7729d4d43725338dd63 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 9 Sep 2019 10:42:28 -0400 Subject: [PATCH 7/8] fix merge errors --- cpp/src/arrow/dataset/dataset.cc | 3 +-- cpp/src/arrow/dataset/scanner.cc | 24 ++++++++++-------------- cpp/src/arrow/dataset/scanner.h | 7 +++---- cpp/src/arrow/dataset/scanner_test.cc | 2 +- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index fd4d9021f42..26f9234df08 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -55,8 +55,7 @@ Status Dataset::Make(const std::vector>& sources, } Status Dataset::NewScan(std::unique_ptr* out) { - auto context = std::make_shared(); - out->reset(new ScannerBuilder(this->shared_from_this(), context)); + out->reset(new ScannerBuilder(this->shared_from_this())); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 4ad0f6a537c..7b7e2c08065 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -31,8 +31,7 @@ std::unique_ptr SimpleScanTask::Scan() { /// \brief GetFragmentsIterator transforms a vector in a flattened /// Iterator. static std::unique_ptr GetFragmentsIterator( - const std::vector>& sources, - std::shared_ptr options) { + const std::vector>& sources, const ScanOptions& options) { // Iterator auto sources_it = MakeVectorIterator(sources); @@ -50,12 +49,11 @@ static std::unique_ptr GetFragmentsIterator( /// \brief GetScanTaskIterator transforms an Iterator in a /// flattened Iterator. static std::unique_ptr GetScanTaskIterator( - std::unique_ptr fragments, - std::shared_ptr context) { + std::unique_ptr fragments) { // DataFragment -> ScanTaskIterator - auto fn = [context](std::shared_ptr fragment, - std::unique_ptr* out) -> Status { - return fragment->Scan(context, out); + auto fn = [](std::shared_ptr fragment, + std::unique_ptr* out) -> Status { + return fragment->Scan(out); }; // Iterator> @@ -73,11 +71,10 @@ std::unique_ptr SimpleScanner::Scan() { // Second, transforms Iterator into a unified // Iterator. The first Iterator::Next invocation is going to do // all the work of unwinding the chained iterators. - return GetScanTaskIterator(std::move(fragments_it), context_); + return GetScanTaskIterator(std::move(fragments_it)); } -ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_context) - : dataset_(std::move(dataset)), scan_context_(std::move(scan_context)) {} +ScannerBuilder::ScannerBuilder(std::shared_ptr dataset) + : dataset_(std::move(dataset)) {} ScannerBuilder* ScannerBuilder::Project(const std::vector& columns) { return this; @@ -88,7 +85,7 @@ ScannerBuilder* ScannerBuilder::AddFilter(const std::shared_ptr& filter) } ScannerBuilder* ScannerBuilder::SetGlobalFileOptions( - std::shared_ptr options) { + std::shared_ptr options) { return this; } @@ -98,8 +95,7 @@ ScannerBuilder* ScannerBuilder::IncludePartitionKeys(bool include) { } Status ScannerBuilder::Finish(std::unique_ptr* out) const { - auto options = std::make_shared(); - out->reset(new SimpleScanner(dataset_->sources(), options, scan_context_)); + out->reset(new SimpleScanner(dataset_->sources(), ScanOptions())); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 38f8b60901e..6dd22fceb90 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -157,15 +157,14 @@ class ARROW_DS_EXPORT Scanner { /// returning a ScanTaskIterator. class ARROW_DS_EXPORT SimpleScanner : public Scanner { public: - SimpleScanner(std::vector> sources, - std::shared_ptr context) - : sources_(std::move(sources)), context_(std::move(context)) {} + SimpleScanner(std::vector> sources, ScanOptions options) + : sources_(std::move(sources)), options_(std::move(options)) {} std::unique_ptr Scan() override; private: std::vector> sources_; - std::shared_ptr context_; + ScanOptions options_; }; class ARROW_DS_EXPORT ScannerBuilder { diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 3168bdd0745..df691b9c254 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -45,7 +45,7 @@ TEST_F(TestSimpleScanner, Scan) { const int64_t total_batches = sources.size() * kNumberBatches * kNumberFragments; auto reader = ConstantArrayGenerator::Repeat(total_batches, batch); - SimpleScanner scanner{sources, options_, ctx_}; + SimpleScanner scanner{sources, ScanOptions()}; // Verifies that the unified BatchReader is equivalent to flattening all the // structures of the scanner, i.e. Scanner[DataSource[ScanTask[RecordBatch]]] From 598ea6fa4c3b27a18296d2aa114f39384b6e2871 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 9 Sep 2019 14:50:16 -0400 Subject: [PATCH 8/8] clang-format --- cpp/src/arrow/dataset/file_base.h | 7 +++++-- cpp/src/arrow/dataset/file_parquet.cc | 6 ++++-- cpp/src/arrow/dataset/file_parquet.h | 3 ++- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index f3d5b34bfef..1549ccea994 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -125,7 +125,9 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { public: FileBasedDataFragment(const FileSource& source, std::shared_ptr format, const ScanOptions& scan_options) - : source_(source), format_(std::move(format)), scan_options_(std::move(scan_options)) {} + : source_(source), + format_(std::move(format)), + scan_options_(std::move(scan_options)) {} Status Scan(std::unique_ptr* out) override; @@ -153,7 +155,8 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { std::string type() const override { return "directory"; } - std::unique_ptr GetFragments(const ScanOptions& scan_options) override; + std::unique_ptr GetFragments( + const ScanOptions& scan_options) override; protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index 52853e33939..b4379ef08a2 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -142,7 +142,8 @@ class ParquetScanTaskIterator : public ScanTaskIterator { private: // Compute the column projection out of an optional arrow::Schema static Status InferColumnProjection(const parquet::FileMetaData& metadata, - const ScanOptions& scan_options, std::vector* out) { + const ScanOptions& scan_options, + std::vector* out) { // TODO(fsaintjacques): Compute intersection _and_ validity *out = internal::Iota(metadata.num_columns()); @@ -161,7 +162,8 @@ class ParquetScanTaskIterator : public ScanTaskIterator { std::shared_ptr reader_; }; -Status ParquetFileFormat::ScanFile(const FileSource& source, const ScanOptions& scan_options, +Status ParquetFileFormat::ScanFile(const FileSource& source, + const ScanOptions& scan_options, std::unique_ptr* out) const { std::shared_ptr input; RETURN_NOT_OK(source.Open(&input)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 402f57125fb..47bde686ef8 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -48,7 +48,8 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { class ARROW_DS_EXPORT ParquetFragment : public FileBasedDataFragment { public: ParquetFragment(const FileSource& source, const ScanOptions& scan_options) - : FileBasedDataFragment(source, std::make_shared(), scan_options) {} + : FileBasedDataFragment(source, std::make_shared(), + scan_options) {} bool splittable() const override { return true; } };