diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 66e25d404d0..26f9234df08 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -30,8 +30,7 @@ SimpleDataFragment::SimpleDataFragment( std::vector> record_batches) : record_batches_(std::move(record_batches)) {} -Status SimpleDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { +Status SimpleDataFragment::Scan(std::unique_ptr* out) { // Make an explicit copy of record_batches_ to ensure Scan can be called // multiple times. auto it = MakeVectorIterator(record_batches_); @@ -56,8 +55,7 @@ Status Dataset::Make(const std::vector>& sources, } Status Dataset::NewScan(std::unique_ptr* out) { - auto context = std::make_shared(); - out->reset(new ScannerBuilder(this->shared_from_this(), context)); + out->reset(new ScannerBuilder(this->shared_from_this())); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index a30c977a72c..8e75c7f1240 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -22,6 +22,7 @@ #include #include +#include "arrow/dataset/scanner.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" #include "arrow/util/iterator.h" @@ -38,18 +39,17 @@ class ARROW_DS_EXPORT DataFragment { public: /// \brief Scan returns an iterator of ScanTasks, each of which yields /// RecordBatches from this DataFragment. - virtual Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) = 0; + virtual Status Scan(std::unique_ptr* out) = 0; /// \brief Return true if the fragment can benefit from parallel /// scanning virtual bool splittable() const = 0; - /// \brief Filtering, schema reconciliation, and partition options to use when + /// \brief Filtering, schema reconciliation, and partition keys to use when /// scanning this fragment. May be nullptr, which indicates that no filtering /// or schema reconciliation will be performed and all partitions will be /// scanned. - virtual std::shared_ptr scan_options() const = 0; + virtual const ScanOptions& scan_options() const = 0; virtual ~DataFragment() = default; }; @@ -60,15 +60,15 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { public: explicit SimpleDataFragment(std::vector> record_batches); - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::unique_ptr* out) override; bool splittable() const override { return false; } - std::shared_ptr scan_options() const override { return NULLPTR; } + const ScanOptions& scan_options() const override { return scan_options_; } protected: std::vector> record_batches_; + ScanOptions scan_options_; }; /// \brief A basic component of a Dataset which yields zero or more @@ -79,7 +79,7 @@ class ARROW_DS_EXPORT DataSource { /// \brief GetFragments returns an iterator of DataFragments. The ScanOptions /// controls filtering and schema inference. virtual std::unique_ptr GetFragments( - std::shared_ptr options) = 0; + const ScanOptions& scan_options) = 0; virtual std::string type() const = 0; @@ -93,7 +93,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { : fragments_(std::move(fragments)) {} std::unique_ptr GetFragments( - std::shared_ptr options) override { + const ScanOptions& scan_options) override { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index acc45b8bf9a..cac2ef842d8 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -40,25 +40,22 @@ Status FileSource::Open(std::shared_ptr* out) const return Status::OK(); } -Status FileBasedDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { - return format_->ScanFile(source_, scan_options_, scan_context, out); +Status FileBasedDataFragment::Scan(std::unique_ptr* out) { + return format_->ScanFile(source_, scan_options_, out); } -FileSystemBasedDataSource::FileSystemBasedDataSource( - fs::FileSystem* filesystem, const fs::Selector& selector, - std::shared_ptr format, std::shared_ptr scan_options, - std::vector stats) +FileSystemBasedDataSource::FileSystemBasedDataSource(fs::FileSystem* filesystem, + const fs::Selector& selector, + std::shared_ptr format, + std::vector stats) : filesystem_(filesystem), selector_(std::move(selector)), format_(std::move(format)), - scan_options_(std::move(scan_options)), stats_(std::move(stats)) {} Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, std::unique_ptr* out) { std::vector stats; RETURN_NOT_OK(filesystem->GetTargetStats(selector, &stats)); @@ -71,18 +68,18 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, stats.resize(new_end - stats.begin()); out->reset(new FileSystemBasedDataSource(filesystem, selector, std::move(format), - std::move(scan_options), std::move(stats))); + std::move(stats))); return Status::OK(); } std::unique_ptr FileSystemBasedDataSource::GetFragments( - std::shared_ptr options) { + const ScanOptions& scan_options) { struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, - std::shared_ptr scan_options, std::vector stats) + const ScanOptions& scan_options, std::vector stats) : filesystem_(filesystem), format_(std::move(format)), - scan_options_(std::move(scan_options)), + scan_options_(scan_options), stats_(std::move(stats)) {} Status Next(std::shared_ptr* out) { @@ -101,11 +98,11 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( size_t i_ = 0; fs::FileSystem* filesystem_; std::shared_ptr format_; - std::shared_ptr scan_options_; + ScanOptions scan_options_; std::vector stats_; }; - return internal::make_unique(filesystem_, format_, options, stats_); + return internal::make_unique(filesystem_, format_, scan_options, stats_); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 2053342dcf7..1549ccea994 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -101,22 +101,6 @@ class ARROW_DS_EXPORT FileSource { std::shared_ptr buffer_; }; -/// \brief Base class for file scanning options -class ARROW_DS_EXPORT FileScanOptions : public ScanOptions { - public: - /// \brief The name of the file format this options corresponds to - virtual std::string file_type() const = 0; -}; - -/// \brief Base class for file writing options -class ARROW_DS_EXPORT FileWriteOptions : public WriteOptions { - public: - virtual ~FileWriteOptions() = default; - - /// \brief The name of the file format this options corresponds to - virtual std::string file_type() const = 0; -}; - /// \brief Base class for file format implementation class ARROW_DS_EXPORT FileFormat { public: @@ -128,14 +112,11 @@ class ARROW_DS_EXPORT FileFormat { virtual bool IsKnownExtension(const std::string& ext) const = 0; /// \brief Open a file for scanning - virtual Status ScanFile(const FileSource& source, - std::shared_ptr scan_options, - std::shared_ptr scan_context, + virtual Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const = 0; /// \brief Open a fragment - virtual Status MakeFragment(const FileSource& location, - std::shared_ptr opts, + virtual Status MakeFragment(const FileSource& location, const ScanOptions& scan_options, std::unique_ptr* out) = 0; }; @@ -143,23 +124,22 @@ class ARROW_DS_EXPORT FileFormat { class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { public: FileBasedDataFragment(const FileSource& source, std::shared_ptr format, - std::shared_ptr scan_options) + const ScanOptions& scan_options) : source_(source), format_(std::move(format)), scan_options_(std::move(scan_options)) {} - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::unique_ptr* out) override; const FileSource& source() const { return source_; } std::shared_ptr format() const { return format_; } - std::shared_ptr scan_options() const override { return scan_options_; } + const ScanOptions& scan_options() const override { return scan_options_; } protected: FileSource source_; std::shared_ptr format_; - std::shared_ptr scan_options_; + ScanOptions scan_options_; }; /// \brief A DataSource which takes files of one format from a directory @@ -171,24 +151,21 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { public: static Status Make(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, std::unique_ptr* out); std::string type() const override { return "directory"; } std::unique_ptr GetFragments( - std::shared_ptr options) override; + const ScanOptions& scan_options) override; protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, std::shared_ptr format, - std::shared_ptr scan_options, std::vector stats); fs::FileSystem* filesystem_ = NULLPTR; fs::Selector selector_; std::shared_ptr format_; - std::shared_ptr scan_options_; std::vector stats_; }; diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 6b02085a53c..bbe490c2d7b 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -36,21 +36,6 @@ class FileSystem; namespace dataset { -class ARROW_DS_EXPORT CsvScanOptions : public FileScanOptions { - public: - std::string file_type() const override; - - private: - csv::ParseOptions parse_options_; - csv::ConvertOptions convert_options_; - csv::ReadOptions read_options_; -}; - -class ARROW_DS_EXPORT CsvWriteOptions : public FileWriteOptions { - public: - std::string file_type() const override; -}; - /// \brief A FileFormat implementation that reads from CSV files class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { public: @@ -60,9 +45,30 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; + + Status MakeFragment(const FileSource& source, const ScanOptions& scan_options, + std::unique_ptr* out) override; +}; + +class ARROW_DS_EXPORT CsvScanOptions : public ScanOptions::FileOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } + + private: + csv::ParseOptions parse_options_; + csv::ConvertOptions convert_options_; + csv::ReadOptions read_options_; +}; + +class ARROW_DS_EXPORT CsvWriteOptions : public FileWriteOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index c91585586ad..1375ab0948b 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -27,14 +27,14 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT FeatherScanOptions : public FileScanOptions { +class ARROW_DS_EXPORT FeatherScanOptions : public ScanOptions::FileOptions { public: - std::string file_type() const override; + std::shared_ptr file_format() const override; }; class ARROW_DS_EXPORT FeatherWriterOptions : public FileWriteOptions { public: - std::string file_type() const override; + std::shared_ptr file_format() const override; }; /// \brief A FileFormat implementation that reads from Feather (Arrow @@ -47,8 +47,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; }; diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index e24cc0d9c45..23e3dcb3e58 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -28,21 +28,6 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT JsonScanOptions : public FileScanOptions { - public: - /// - std::string file_type() const override; - - private: - json::ParseOptions parse_options_; - json::ReadOptions read_options_; -}; - -class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions { - public: - std::string file_type() const override; -}; - /// \brief A FileFormat implementation that reads from JSON files class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { public: @@ -52,9 +37,29 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override; /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; + + Status MakeFragment(const FileSource& source, const ScanOptions& scan_options, + std::unique_ptr* out) override; +}; + +class ARROW_DS_EXPORT JsonScanOptions : public ScanOptions::FileOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } + + private: + json::ParseOptions parse_options_; + json::ReadOptions read_options_; +}; + +class ARROW_DS_EXPORT JsonWriteOptions : public FileWriteOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index b761ab24814..b4379ef08a2 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -109,16 +109,15 @@ class ParquetRowGroupPartitioner { class ParquetScanTaskIterator : public ScanTaskIterator { public: - static Status Make(std::shared_ptr options, - std::shared_ptr context, ParquetFileReaderPtr reader, + static Status Make(const ScanOptions& scan_options, ParquetFileReaderPtr reader, std::unique_ptr* out) { auto metadata = reader->metadata(); std::vector columns_projection; - RETURN_NOT_OK(InferColumnProjection(*metadata, options, &columns_projection)); + RETURN_NOT_OK(InferColumnProjection(*metadata, scan_options, &columns_projection)); std::unique_ptr arrow_reader; - RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader), + RETURN_NOT_OK(parquet::arrow::FileReader::Make(scan_options.pool(), std::move(reader), &arrow_reader)); out->reset(new ParquetScanTaskIterator(columns_projection, metadata, @@ -143,7 +142,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator { private: // Compute the column projection out of an optional arrow::Schema static Status InferColumnProjection(const parquet::FileMetaData& metadata, - const std::shared_ptr& options, + const ScanOptions& scan_options, std::vector* out) { // TODO(fsaintjacques): Compute intersection _and_ validity *out = internal::Iota(metadata.num_columns()); @@ -164,22 +163,20 @@ class ParquetScanTaskIterator : public ScanTaskIterator { }; Status ParquetFileFormat::ScanFile(const FileSource& source, - std::shared_ptr scan_options, - std::shared_ptr scan_context, + const ScanOptions& scan_options, std::unique_ptr* out) const { std::shared_ptr input; RETURN_NOT_OK(source.Open(&input)); auto reader = parquet::ParquetFileReader::Open(input); - return ParquetScanTaskIterator::Make(scan_options, scan_context, std::move(reader), - out); + return ParquetScanTaskIterator::Make(std::move(scan_options), std::move(reader), out); } Status ParquetFileFormat::MakeFragment(const FileSource& source, - std::shared_ptr opts, + const ScanOptions& scan_options, std::unique_ptr* out) { - // TODO(bkietz) check location.path() against IsKnownExtension etc - *out = internal::make_unique(source, opts); + // TODO(bkietz) check source.path() against IsKnownExtension etc + *out = internal::make_unique(source, std::move(scan_options)); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index fd462588317..47bde686ef8 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -27,16 +27,6 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT ParquetScanOptions : public FileScanOptions { - public: - std::string file_type() const override { return "parquet"; } -}; - -class ARROW_DS_EXPORT ParquetWriteOptions : public FileWriteOptions { - public: - std::string file_type() const override { return "parquet"; } -}; - /// \brief A FileFormat implementation that reads from Parquet files class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { public: @@ -48,21 +38,36 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { } /// \brief Open a file for scanning - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override; - Status MakeFragment(const FileSource& source, std::shared_ptr opts, + Status MakeFragment(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) override; }; class ARROW_DS_EXPORT ParquetFragment : public FileBasedDataFragment { public: - ParquetFragment(const FileSource& source, std::shared_ptr options) - : FileBasedDataFragment(source, std::make_shared(), options) {} + ParquetFragment(const FileSource& source, const ScanOptions& scan_options) + : FileBasedDataFragment(source, std::make_shared(), + scan_options) {} bool splittable() const override { return true; } }; +class ARROW_DS_EXPORT ParquetScanOptions : public ScanOptions::FileOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } + // TODO(bkietz) probably this should wrap an ArrowReaderProperties +}; + +class ARROW_DS_EXPORT ParquetWriteOptions : public FileWriteOptions { + public: + std::shared_ptr file_format() const override { + return std::make_shared(); + } +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 058a49d6796..1ee4f471216 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -142,22 +142,15 @@ class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin { } }; -class TestParquetFileFormat : public ParquetBufferFixtureMixin { - public: - TestParquetFileFormat() : ctx_(std::make_shared()) {} - - protected: - std::shared_ptr opts_; - std::shared_ptr ctx_; -}; +class TestParquetFileFormat : public ParquetBufferFixtureMixin {}; TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto reader = GetRecordBatchReader(); auto source = GetFileSource(reader.get()); - auto fragment = std::make_shared(*source, opts_); + auto fragment = std::make_shared(*source, ScanOptions()); std::unique_ptr it; - ASSERT_OK(fragment->Scan(ctx_, &it)); + ASSERT_OK(fragment->Scan(&it)); int64_t row_count = 0; ASSERT_OK(it->Visit([&row_count](std::unique_ptr task) -> Status { diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 28c55adcc10..880141adb58 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -161,7 +161,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { public: SimplePartition(std::unique_ptr partition_key, DataFragmentVector&& data_fragments, PartitionVector&& subpartitions, - std::shared_ptr scan_options = NULLPTR) + const ScanOptions& scan_options = NULLPTR) : key_(std::move(partition_key)), data_fragments_(std::move(data_fragments)), subpartitions_(std::move(subpartitions)), @@ -171,7 +171,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { int num_subpartitions() const { return static_cast(subpartitions_.size()); } - int num_data_fragments() const { return static_cast(data_fragments__.size()); } + int num_data_fragments() const { return static_cast(data_fragments_.size()); } const PartitionVector& subpartitions() const { return subpartitions_; } const DataFragmentVector& data_fragments() const { return data_fragments_; } @@ -192,7 +192,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { std::vector> subpartitions_; /// \brief Default scan options to use for data fragments - std::shared_ptr scan_options_; + ScanOptions scan_options_; }; /// \brief A PartitionSource that returns fragments as the result of input iterators diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 4ad0f6a537c..7b7e2c08065 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -31,8 +31,7 @@ std::unique_ptr SimpleScanTask::Scan() { /// \brief GetFragmentsIterator transforms a vector in a flattened /// Iterator. static std::unique_ptr GetFragmentsIterator( - const std::vector>& sources, - std::shared_ptr options) { + const std::vector>& sources, const ScanOptions& options) { // Iterator auto sources_it = MakeVectorIterator(sources); @@ -50,12 +49,11 @@ static std::unique_ptr GetFragmentsIterator( /// \brief GetScanTaskIterator transforms an Iterator in a /// flattened Iterator. static std::unique_ptr GetScanTaskIterator( - std::unique_ptr fragments, - std::shared_ptr context) { + std::unique_ptr fragments) { // DataFragment -> ScanTaskIterator - auto fn = [context](std::shared_ptr fragment, - std::unique_ptr* out) -> Status { - return fragment->Scan(context, out); + auto fn = [](std::shared_ptr fragment, + std::unique_ptr* out) -> Status { + return fragment->Scan(out); }; // Iterator> @@ -73,11 +71,10 @@ std::unique_ptr SimpleScanner::Scan() { // Second, transforms Iterator into a unified // Iterator. The first Iterator::Next invocation is going to do // all the work of unwinding the chained iterators. - return GetScanTaskIterator(std::move(fragments_it), context_); + return GetScanTaskIterator(std::move(fragments_it)); } -ScannerBuilder::ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_context) - : dataset_(std::move(dataset)), scan_context_(std::move(scan_context)) {} +ScannerBuilder::ScannerBuilder(std::shared_ptr dataset) + : dataset_(std::move(dataset)) {} ScannerBuilder* ScannerBuilder::Project(const std::vector& columns) { return this; @@ -88,7 +85,7 @@ ScannerBuilder* ScannerBuilder::AddFilter(const std::shared_ptr& filter) } ScannerBuilder* ScannerBuilder::SetGlobalFileOptions( - std::shared_ptr options) { + std::shared_ptr options) { return this; } @@ -98,8 +95,7 @@ ScannerBuilder* ScannerBuilder::IncludePartitionKeys(bool include) { } Status ScannerBuilder::Finish(std::unique_ptr* out) const { - auto options = std::make_shared(); - out->reset(new SimpleScanner(dataset_->sources(), options, scan_context_)); + out->reset(new SimpleScanner(dataset_->sources(), ScanOptions())); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 9177a5418d7..6dd22fceb90 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -29,31 +29,73 @@ namespace arrow { namespace dataset { -/// \brief Shared state for a Scan operation -struct ARROW_DS_EXPORT ScanContext { - MemoryPool* pool = arrow::default_memory_pool(); -}; - -// TODO(wesm): API for handling of post-materialization filters. For -// example, if the user requests [$col1 > 0, $col2 > 0] and $col1 is a -// partition key, but $col2 is not, then the filter "$col2 > 0" must -// be evaluated in-memory against the RecordBatch objects resulting -// from the Scan - -class ARROW_DS_EXPORT ScanOptions { +/// Container for scan state +struct ARROW_DS_EXPORT ScanOptions final { public: - virtual ~ScanOptions() = default; + ScanOptions() = default; + /// Filters const std::shared_ptr& selector() const { return selector_; } + ScanOptions& selector(std::shared_ptr s) { + selector_ = std::move(s); + return *this; + } + + /// Schema to which record batches will be projected const std::shared_ptr& schema() const { return schema_; } + ScanOptions& schema(std::shared_ptr s) { + schema_ = std::move(s); + return *this; + } + + /// MemoryPool used for allocating temporary memory and yielded record batches + MemoryPool* pool() const { return pool_; } + + ScanOptions& pool(MemoryPool* p) { + pool_ = p; + return *this; + } + + /// \brief Base class for format specific file scanning options + class ARROW_DS_EXPORT FileOptions { + public: + /// \brief The file format this options corresponds to + virtual std::shared_ptr file_format() const = 0; + + virtual ~FileOptions() = default; + }; + + /// Format-specific file scanning options + const std::vector>& file_options() const { + return file_options_; + } + + ScanOptions& AddFileOptions(std::shared_ptr file_options) { + file_options_.push_back(std::move(file_options)); + return *this; + } + + /// Include columns generated from partition keys + bool include_partition_keys() const { return include_partition_keys_; } + + ScanOptions& include_partition_keys(bool include_partition_keys) { + include_partition_keys_ = include_partition_keys; + return *this; + } + protected: - // Filters + bool include_partition_keys_; + std::shared_ptr selector_; // Schema to which record batches will be reconciled std::shared_ptr schema_; + + MemoryPool* pool_ = default_memory_pool(); + + std::vector> file_options_; }; /// \brief Read record batches from a range of a single data fragment. A @@ -115,32 +157,26 @@ class ARROW_DS_EXPORT Scanner { /// returning a ScanTaskIterator. class ARROW_DS_EXPORT SimpleScanner : public Scanner { public: - SimpleScanner(std::vector> sources, - std::shared_ptr options, - std::shared_ptr context) - : sources_(std::move(sources)), - options_(std::move(options)), - context_(std::move(context)) {} + SimpleScanner(std::vector> sources, ScanOptions options) + : sources_(std::move(sources)), options_(std::move(options)) {} std::unique_ptr Scan() override; private: std::vector> sources_; - std::shared_ptr options_; - std::shared_ptr context_; + ScanOptions options_; }; class ARROW_DS_EXPORT ScannerBuilder { public: - ScannerBuilder(std::shared_ptr dataset, - std::shared_ptr scan_context); + explicit ScannerBuilder(std::shared_ptr dataset); /// \brief Set ScannerBuilder* Project(const std::vector& columns); ScannerBuilder* AddFilter(const std::shared_ptr& filter); - ScannerBuilder* SetGlobalFileOptions(std::shared_ptr options); + ScannerBuilder* SetGlobalFileOptions(std::shared_ptr options); /// \brief If true (default), add partition keys to the /// RecordBatches that the scan produces if they are not in the data @@ -152,7 +188,6 @@ class ARROW_DS_EXPORT ScannerBuilder { private: std::shared_ptr dataset_; - std::shared_ptr scan_context_; std::vector project_columns_; FilterVector filters_; bool include_partition_keys_; diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 3168bdd0745..df691b9c254 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -45,7 +45,7 @@ TEST_F(TestSimpleScanner, Scan) { const int64_t total_batches = sources.size() * kNumberBatches * kNumberFragments; auto reader = ConstantArrayGenerator::Repeat(total_batches, batch); - SimpleScanner scanner{sources, options_, ctx_}; + SimpleScanner scanner{sources, ScanOptions()}; // Verifies that the unified BatchReader is equivalent to flattening all the // structures of the scanner, i.e. Scanner[DataSource[ScanTask[RecordBatch]]] diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index c4cbaef0dd7..f37563d6896 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -65,8 +65,6 @@ void EnsureRecordBatchReaderDrained(RecordBatchReader* reader) { class DatasetFixtureMixin : public ::testing::Test { public: - DatasetFixtureMixin() : ctx_(std::make_shared()) {} - /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by the data fragment. void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task, @@ -90,7 +88,7 @@ class DatasetFixtureMixin : public ::testing::Test { void AssertFragmentEquals(RecordBatchReader* expected, DataFragment* fragment, bool ensure_drained = true) { std::unique_ptr it; - ARROW_EXPECT_OK(fragment->Scan(ctx_, &it)); + ARROW_EXPECT_OK(fragment->Scan(&it)); ARROW_EXPECT_OK(it->Visit([&](std::unique_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); @@ -106,7 +104,7 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragments of a source. void AssertDataSourceEquals(RecordBatchReader* expected, DataSource* source, bool ensure_drained = true) { - auto it = source->GetFragments(options_); + auto it = source->GetFragments(scan_options_); ARROW_EXPECT_OK(it->Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); @@ -152,8 +150,7 @@ class DatasetFixtureMixin : public ::testing::Test { } protected: - std::shared_ptr options_ = nullptr; - std::shared_ptr ctx_; + ScanOptions scan_options_; }; template @@ -163,6 +160,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { void SetUp() override { format_ = std::make_shared(); + schema_ = schema({field("dummy", null())}); + scan_options_.schema(schema_); ASSERT_OK( TemporaryDir::Make("test-fsdatasource-" + format_->name() + "-", &temp_dir_)); @@ -187,8 +186,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { } void MakeDataSource() { - ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, - std::make_shared(), &source_)); + ASSERT_OK(FileSystemBasedDataSource::Make(fs_.get(), selector_, format_, &source_)); } protected: @@ -197,17 +195,17 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - ++count; - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + ASSERT_OK(source_->GetFragments(scan_options_) + ->Visit([&](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + ++count; + auto extension = fs::internal::GetAbstractPathExtension( + file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + })); ASSERT_EQ(count, 1); } @@ -218,17 +216,17 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { MakeDataSource(); int count = 0; - ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - ++count; - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + ASSERT_OK(source_->GetFragments(scan_options_) + ->Visit([&](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + ++count; + auto extension = fs::internal::GetAbstractPathExtension( + file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + })); ASSERT_EQ(count, 4); } @@ -242,15 +240,16 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { ASSERT_RAISES( IOError, - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { - auto file_fragment = - internal::checked_pointer_cast(fragment); - auto extension = - fs::internal::GetAbstractPathExtension(file_fragment->source().path()); - EXPECT_TRUE(format_->IsKnownExtension(extension)); - std::shared_ptr f; - return this->fs_->OpenInputFile(file_fragment->source().path(), &f); - })); + source_->GetFragments(scan_options_) + ->Visit([&](std::shared_ptr fragment) { + auto file_fragment = + internal::checked_pointer_cast(fragment); + auto extension = + fs::internal::GetAbstractPathExtension(file_fragment->source().path()); + EXPECT_TRUE(format_->IsKnownExtension(extension)); + std::shared_ptr f; + return this->fs_->OpenInputFile(file_fragment->source().path(), &f); + })); } fs::Selector selector_; @@ -259,6 +258,8 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { std::shared_ptr fs_; std::unique_ptr temp_dir_; std::shared_ptr format_; + ScanOptions scan_options_; + std::shared_ptr schema_; }; template @@ -276,30 +277,29 @@ class DummyFileFormat : public FileFormat { bool IsKnownExtension(const std::string& ext) const override { return ext == name(); } /// \brief Open a file for scanning (always returns an empty iterator) - Status ScanFile(const FileSource& source, std::shared_ptr scan_options, - std::shared_ptr scan_context, + Status ScanFile(const FileSource& source, const ScanOptions& scan_options, std::unique_ptr* out) const override { *out = internal::make_unique>>(); return Status::OK(); } - inline Status MakeFragment(const FileSource& location, - std::shared_ptr opts, + inline Status MakeFragment(const FileSource& location, const ScanOptions& scan_options, std::unique_ptr* out) override; }; class DummyFragment : public FileBasedDataFragment { public: - DummyFragment(const FileSource& source, std::shared_ptr options) - : FileBasedDataFragment(source, std::make_shared(), options) {} + DummyFragment(const FileSource& source, const ScanOptions& scan_options) + : FileBasedDataFragment(source, std::make_shared(), scan_options) { + } bool splittable() const override { return false; } }; Status DummyFileFormat::MakeFragment(const FileSource& source, - std::shared_ptr opts, + const ScanOptions& scan_options, std::unique_ptr* out) { - *out = internal::make_unique(source, opts); + *out = internal::make_unique(source, scan_options); return Status::OK(); } diff --git a/cpp/src/arrow/dataset/type_fwd.h b/cpp/src/arrow/dataset/type_fwd.h index 8e3824625ed..b3f72c4099d 100644 --- a/cpp/src/arrow/dataset/type_fwd.h +++ b/cpp/src/arrow/dataset/type_fwd.h @@ -44,7 +44,6 @@ struct DiscoveryOptions; class FileBasedDataFragment; class FileFormat; -class FileScanOptions; class FileWriteOptions; class Filter; @@ -56,8 +55,7 @@ class PartitionScheme; using PartitionVector = std::vector>; using PartitionIterator = Iterator>; -struct ScanContext; -class ScanOptions; +struct ScanOptions; class Scanner; class ScannerBuilder; class ScanTask; diff --git a/cpp/src/arrow/dataset/writer.h b/cpp/src/arrow/dataset/writer.h index 048a0e54d75..741f63691b6 100644 --- a/cpp/src/arrow/dataset/writer.h +++ b/cpp/src/arrow/dataset/writer.h @@ -18,8 +18,6 @@ #pragma once #include -#include -#include #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" @@ -27,9 +25,13 @@ namespace arrow { namespace dataset { -class ARROW_DS_EXPORT WriteOptions { +/// \brief Base class for file writing options +class ARROW_DS_EXPORT FileWriteOptions { public: - virtual ~WriteOptions() = default; + /// \brief The file format this options corresponds to + virtual std::shared_ptr file_format() const = 0; + + virtual ~FileWriteOptions() = default; }; } // namespace dataset