From ca0bdf4a73742768f2128cab0212cbfe44835d26 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Wed, 11 Sep 2019 11:20:01 -0400 Subject: [PATCH 1/6] ARROW-6558: [C++] Refactor Iterator to type erased handle --- cpp/src/arrow/dataset/dataset.cc | 2 +- cpp/src/arrow/dataset/dataset.h | 11 +- cpp/src/arrow/dataset/dataset_internal.h | 13 +- cpp/src/arrow/dataset/file_base.cc | 6 +- cpp/src/arrow/dataset/file_base.h | 8 +- cpp/src/arrow/dataset/file_csv.h | 2 +- cpp/src/arrow/dataset/file_feather.h | 2 +- cpp/src/arrow/dataset/file_json.h | 2 +- cpp/src/arrow/dataset/file_parquet.cc | 16 +- cpp/src/arrow/dataset/file_parquet.h | 2 +- cpp/src/arrow/dataset/file_parquet_test.cc | 33 ++-- cpp/src/arrow/dataset/partition.h | 11 +- cpp/src/arrow/dataset/scanner.cc | 20 +-- cpp/src/arrow/dataset/scanner.h | 8 +- cpp/src/arrow/dataset/test_util.h | 20 +-- cpp/src/arrow/record_batch.cc | 6 +- cpp/src/arrow/record_batch.h | 10 +- cpp/src/arrow/table.cc | 131 ++++++-------- cpp/src/arrow/table.h | 10 +- cpp/src/arrow/util/functional.h | 5 + cpp/src/arrow/util/iterator.h | 195 +++++++++++++-------- cpp/src/arrow/util/iterator_test.cc | 125 +++++++------ cpp/src/parquet/arrow/reader.cc | 24 --- cpp/src/parquet/arrow/reader.h | 5 - 24 files changed, 336 insertions(+), 331 deletions(-) diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 66e25d404d0..548f90612b3 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -31,7 +31,7 @@ SimpleDataFragment::SimpleDataFragment( : record_batches_(std::move(record_batches)) {} Status SimpleDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { + ScanTaskIterator* out) { // Make an explicit copy of record_batches_ to ensure Scan can be called // multiple times. auto it = MakeVectorIterator(record_batches_); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index a30c977a72c..4e75112ad4a 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -39,7 +39,7 @@ class ARROW_DS_EXPORT DataFragment { /// \brief Scan returns an iterator of ScanTasks, each of which yields /// RecordBatches from this DataFragment. virtual Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) = 0; + ScanTaskIterator* out) = 0; /// \brief Return true if the fragment can benefit from parallel /// scanning @@ -60,8 +60,7 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { public: explicit SimpleDataFragment(std::vector> record_batches); - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::shared_ptr scan_context, ScanTaskIterator* out) override; bool splittable() const override { return false; } @@ -78,8 +77,7 @@ class ARROW_DS_EXPORT DataSource { public: /// \brief GetFragments returns an iterator of DataFragments. The ScanOptions /// controls filtering and schema inference. - virtual std::unique_ptr GetFragments( - std::shared_ptr options) = 0; + virtual DataFragmentIterator GetFragments(std::shared_ptr options) = 0; virtual std::string type() const = 0; @@ -92,8 +90,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { explicit SimpleDataSource(DataFragmentVector fragments) : fragments_(std::move(fragments)) {} - std::unique_ptr GetFragments( - std::shared_ptr options) override { + DataFragmentIterator GetFragments(std::shared_ptr options) override { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 4dd42b7a865..0e7b3226706 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -163,16 +163,15 @@ constexpr int RecordBatchProjector::kNoMatch; class ProjectedRecordBatchReader : public RecordBatchReader { public: static Status Make(MemoryPool* pool, RecordBatchProjector projector, - std::unique_ptr wrapped, - std::unique_ptr* out) { - out->reset( - new ProjectedRecordBatchReader(pool, std::move(projector), std::move(wrapped))); + RecordBatchIterator wrapped, RecordBatchIterator* out) { + *out = RecordBatchIterator( + ProjectedRecordBatchReader(pool, std::move(projector), std::move(wrapped))); return Status::OK(); } Status ReadNext(std::shared_ptr* out) override { std::shared_ptr rb; - RETURN_NOT_OK(wrapped_->Next(&rb)); + RETURN_NOT_OK(wrapped_.Next(&rb)); if (rb == nullptr) { *out = nullptr; return Status::OK(); @@ -185,11 +184,11 @@ class ProjectedRecordBatchReader : public RecordBatchReader { private: ProjectedRecordBatchReader(MemoryPool* pool, RecordBatchProjector projector, - std::unique_ptr wrapped) + RecordBatchIterator wrapped) : projector_(std::move(projector)), wrapped_(std::move(wrapped)) {} RecordBatchProjector projector_; - std::unique_ptr wrapped_; + RecordBatchIterator wrapped_; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index acc45b8bf9a..20f4944c830 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -41,7 +41,7 @@ Status FileSource::Open(std::shared_ptr* out) const } Status FileBasedDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { + ScanTaskIterator* out) { return format_->ScanFile(source_, scan_options_, scan_context, out); } @@ -75,7 +75,7 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, return Status::OK(); } -std::unique_ptr FileSystemBasedDataSource::GetFragments( +DataFragmentIterator FileSystemBasedDataSource::GetFragments( std::shared_ptr options) { struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, @@ -105,7 +105,7 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( std::vector stats_; }; - return internal::make_unique(filesystem_, format_, options, stats_); + return DataFragmentIterator(Impl(filesystem_, format_, options, stats_)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 2053342dcf7..f3add99b409 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -131,7 +131,7 @@ class ARROW_DS_EXPORT FileFormat { virtual Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const = 0; + ScanTaskIterator* out) const = 0; /// \brief Open a fragment virtual Status MakeFragment(const FileSource& location, @@ -148,8 +148,7 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { format_(std::move(format)), scan_options_(std::move(scan_options)) {} - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::shared_ptr scan_context, ScanTaskIterator* out) override; const FileSource& source() const { return source_; } std::shared_ptr format() const { return format_; } @@ -176,8 +175,7 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { std::string type() const override { return "directory"; } - std::unique_ptr GetFragments( - std::shared_ptr options) override; + DataFragmentIterator GetFragments(std::shared_ptr options) override; protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 6b02085a53c..ea669713b2a 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -62,7 +62,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index c91585586ad..a9d6b025859 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -49,7 +49,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index e24cc0d9c45..d3023693c7a 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -54,7 +54,7 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index b761ab24814..77c3a71e2d3 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -54,8 +54,8 @@ class ParquetScanTask : public ScanTask { return Status::OK(); } - std::unique_ptr Scan() override { - return std::move(record_batch_reader_); + RecordBatchIterator Scan() { + return MakePointerIterator(std::move(record_batch_reader_)); } private: @@ -107,11 +107,11 @@ class ParquetRowGroupPartitioner { int num_row_groups_; }; -class ParquetScanTaskIterator : public ScanTaskIterator { +class ParquetScanTaskIterator { public: static Status Make(std::shared_ptr options, std::shared_ptr context, ParquetFileReaderPtr reader, - std::unique_ptr* out) { + ScanTaskIterator* out) { auto metadata = reader->metadata(); std::vector columns_projection; @@ -121,13 +121,13 @@ class ParquetScanTaskIterator : public ScanTaskIterator { RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader), &arrow_reader)); - out->reset(new ParquetScanTaskIterator(columns_projection, metadata, - std::move(arrow_reader))); + *out = ScanTaskIterator( + ParquetScanTaskIterator(columns_projection, metadata, std::move(arrow_reader))); return Status::OK(); } - Status Next(ScanTaskPtr* task) override { + Status Next(ScanTaskPtr* task) { auto partition = partitionner_.Next(); // Iteration is done. @@ -166,7 +166,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator { Status ParquetFileFormat::ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const { + ScanTaskIterator* out) const { std::shared_ptr input; RETURN_NOT_OK(source.Open(&input)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index fd462588317..a04f4596f87 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -50,7 +50,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; Status MakeFragment(const FileSource& source, std::shared_ptr opts, std::unique_ptr* out) override; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 058a49d6796..e4c2ccd080c 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -60,8 +60,8 @@ Status WriteRecordBatch(const RecordBatch& batch, FileWriter* writer) { return Status::OK(); } -Status WriteRecordBatchReader(RecordBatchReader* reader, FileWriter* writer) { - auto schema = reader->schema(); +Status WriteRecordBatchReader(RecordBatchReader&& reader, FileWriter* writer) { + auto schema = reader.schema(); if (!schema->Equals(*writer->schema(), false)) { return Status::Invalid("RecordBatch schema does not match this writer's. batch:'", @@ -69,33 +69,34 @@ Status WriteRecordBatchReader(RecordBatchReader* reader, FileWriter* writer) { "'"); } - return reader->Visit([&](std::shared_ptr batch) -> Status { - return WriteRecordBatch(*batch, writer); - }); + return MakePointerIterator(&reader).Visit( + [&](std::shared_ptr batch) -> Status { + return WriteRecordBatch(*batch, writer); + }); } Status WriteRecordBatchReader( - RecordBatchReader* reader, MemoryPool* pool, + RecordBatchReader&& reader, MemoryPool* pool, const std::shared_ptr& sink, const std::shared_ptr& properties = default_writer_properties(), const std::shared_ptr& arrow_properties = default_arrow_writer_properties()) { std::unique_ptr writer; - RETURN_NOT_OK(FileWriter::Open(*reader->schema(), pool, sink, properties, + RETURN_NOT_OK(FileWriter::Open(*reader.schema(), pool, sink, properties, arrow_properties, &writer)); - RETURN_NOT_OK(WriteRecordBatchReader(reader, writer.get())); + RETURN_NOT_OK(WriteRecordBatchReader(std::move(reader), writer.get())); return writer->Close(); } class ArrowParquetWriterMixin : public ::testing::Test { public: - std::shared_ptr Write(RecordBatchReader* reader) { + std::shared_ptr Write(RecordBatchReader&& reader) { auto pool = ::arrow::default_memory_pool(); std::shared_ptr out; auto sink = CreateOutputStream(pool); - ARROW_EXPECT_OK(WriteRecordBatchReader(reader, pool, sink)); + ARROW_EXPECT_OK(WriteRecordBatchReader(std::move(reader), pool, sink)); ARROW_EXPECT_OK(sink->Finish(&out)); return out; @@ -116,8 +117,8 @@ class ArrowParquetWriterMixin : public ::testing::Test { class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin { public: - std::unique_ptr GetFileSource(RecordBatchReader* reader) { - auto buffer = Write(reader); + std::unique_ptr GetFileSource(RecordBatchReader&& reader) { + auto buffer = Write(std::move(reader)); return internal::make_unique(std::move(buffer)); } @@ -153,16 +154,16 @@ class TestParquetFileFormat : public ParquetBufferFixtureMixin { TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto reader = GetRecordBatchReader(); - auto source = GetFileSource(reader.get()); + auto source = GetFileSource(std::move(*reader)); auto fragment = std::make_shared(*source, opts_); - std::unique_ptr it; + ScanTaskIterator it; ASSERT_OK(fragment->Scan(ctx_, &it)); int64_t row_count = 0; - ASSERT_OK(it->Visit([&row_count](std::unique_ptr task) -> Status { + ASSERT_OK(it.Visit([&row_count](std::unique_ptr task) -> Status { auto batch_it = task->Scan(); - return batch_it->Visit([&row_count](std::shared_ptr batch) -> Status { + return batch_it.Visit([&row_count](std::shared_ptr batch) -> Status { row_count += batch->num_rows(); return Status::OK(); }); diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 28c55adcc10..d6adf157432 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -151,8 +151,7 @@ class ARROW_DS_EXPORT Partition : public DataSource { /// e.g. for the top-level partitioned source container virtual const PartitionKey* key() const = 0; - virtual std::unique_ptr GetFragments( - const Selector& selector) = 0; + virtual DataFragmentIterator GetFragments(const Selector& selector) = 0; }; /// \brief Simple implementation of Partition, which consists of a @@ -176,8 +175,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { const PartitionVector& subpartitions() const { return subpartitions_; } const DataFragmentVector& data_fragments() const { return data_fragments_; } - std::unique_ptr GetFragments( - const FilterVector& filters) override; + DataFragmentIterator GetFragments(const FilterVector& filters) override; private: std::unique_ptr key_; @@ -200,13 +198,12 @@ class ARROW_DS_EXPORT LazyPartition : public Partition { public: const PartitionKey* key() const override; - std::unique_ptr GetFragments( - const& DataSelector selector) override; + DataFragmentIterator GetFragments(const& DataSelector selector) override; // TODO(wesm): Iterate over subpartitions protected: - std::unique_ptr partition_iter_; + PartitionIterator partition_iter_; // By default, once this source is consumed using GetFragments, it // cannot be consumed again. By setting this to true, we cache diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 4ad0f6a537c..1e5b8a8f2f7 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -24,21 +24,20 @@ namespace arrow { namespace dataset { -std::unique_ptr SimpleScanTask::Scan() { - return MakeVectorIterator(record_batches_); -} +RecordBatchIterator SimpleScanTask::Scan() { return MakeVectorIterator(record_batches_); } /// \brief GetFragmentsIterator transforms a vector in a flattened /// Iterator. -static std::unique_ptr GetFragmentsIterator( +static DataFragmentIterator GetFragmentsIterator( const std::vector>& sources, std::shared_ptr options) { // Iterator auto sources_it = MakeVectorIterator(sources); // DataSource -> Iterator - auto fn = [options](std::shared_ptr source) - -> std::unique_ptr { return source->GetFragments(options); }; + auto fn = [options](std::shared_ptr source) -> DataFragmentIterator { + return source->GetFragments(options); + }; // Iterator> auto fragments_it = MakeMapIterator(fn, std::move(sources_it)); @@ -49,12 +48,11 @@ static std::unique_ptr GetFragmentsIterator( /// \brief GetScanTaskIterator transforms an Iterator in a /// flattened Iterator. -static std::unique_ptr GetScanTaskIterator( - std::unique_ptr fragments, - std::shared_ptr context) { +static ScanTaskIterator GetScanTaskIterator(DataFragmentIterator fragments, + std::shared_ptr context) { // DataFragment -> ScanTaskIterator auto fn = [context](std::shared_ptr fragment, - std::unique_ptr* out) -> Status { + ScanTaskIterator* out) -> Status { return fragment->Scan(context, out); }; @@ -65,7 +63,7 @@ static std::unique_ptr GetScanTaskIterator( return MakeFlattenIterator(std::move(maybe_scantask_it)); } -std::unique_ptr SimpleScanner::Scan() { +ScanTaskIterator SimpleScanner::Scan() { // First, transforms DataSources in a flat Iterator. This // iterator is lazily constructed, i.e. DataSource::GetFragments is never // invoked. diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 9177a5418d7..023290cc8b8 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -64,7 +64,7 @@ class ARROW_DS_EXPORT ScanTask { /// \brief Iterate through sequence of materialized record batches /// resulting from the Scan. Execution semantics encapsulated in the /// particular ScanTask implementation - virtual std::unique_ptr Scan() = 0; + virtual RecordBatchIterator Scan() = 0; virtual ~ScanTask() = default; }; @@ -75,7 +75,7 @@ class ARROW_DS_EXPORT SimpleScanTask : public ScanTask { explicit SimpleScanTask(std::vector> record_batches) : record_batches_(std::move(record_batches)) {} - std::unique_ptr Scan() override; + RecordBatchIterator Scan() override; protected: std::vector> record_batches_; @@ -95,7 +95,7 @@ class ARROW_DS_EXPORT Scanner { /// \brief The Scan operator returns a stream of ScanTask. The caller is /// responsible to dispatch/schedule said tasks. Tasks should be safe to run /// in a concurrent fashion and outlive the iterator. - virtual std::unique_ptr Scan() = 0; + virtual ScanTaskIterator Scan() = 0; virtual ~Scanner() = default; }; @@ -122,7 +122,7 @@ class ARROW_DS_EXPORT SimpleScanner : public Scanner { options_(std::move(options)), context_(std::move(context)) {} - std::unique_ptr Scan() override; + ScanTaskIterator Scan() override; private: std::vector> sources_; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index c4cbaef0dd7..74b81b90641 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -72,7 +72,7 @@ class DatasetFixtureMixin : public ::testing::Test { void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task, bool ensure_drained = true) { auto it = task->Scan(); - ARROW_EXPECT_OK(it->Visit([expected](std::shared_ptr rhs) -> Status { + ARROW_EXPECT_OK(it.Visit([expected](std::shared_ptr rhs) -> Status { std::shared_ptr lhs; RETURN_NOT_OK(expected->ReadNext(&lhs)); EXPECT_NE(lhs, nullptr); @@ -89,10 +89,10 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, DataFragment* fragment, bool ensure_drained = true) { - std::unique_ptr it; + ScanTaskIterator it; ARROW_EXPECT_OK(fragment->Scan(ctx_, &it)); - ARROW_EXPECT_OK(it->Visit([&](std::unique_ptr task) -> Status { + ARROW_EXPECT_OK(it.Visit([&](std::unique_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); return Status::OK(); })); @@ -108,7 +108,7 @@ class DatasetFixtureMixin : public ::testing::Test { bool ensure_drained = true) { auto it = source->GetFragments(options_); - ARROW_EXPECT_OK(it->Visit([&](std::shared_ptr fragment) -> Status { + ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); return Status::OK(); })); @@ -124,7 +124,7 @@ class DatasetFixtureMixin : public ::testing::Test { bool ensure_drained = true) { auto it = scanner->Scan(); - ARROW_EXPECT_OK(it->Visit([&](std::unique_ptr task) -> Status { + ARROW_EXPECT_OK(it.Visit([&](std::unique_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); return Status::OK(); })); @@ -198,7 +198,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { int count = 0; ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -219,7 +219,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { int count = 0; ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -242,7 +242,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { ASSERT_RAISES( IOError, - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); auto extension = @@ -278,8 +278,8 @@ class DummyFileFormat : public FileFormat { /// \brief Open a file for scanning (always returns an empty iterator) Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override { - *out = internal::make_unique>>(); + ScanTaskIterator* out) const override { + *out = MakeEmptyIterator>(); return Status::OK(); } diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 1e4ad45a3d2..c81f6d6e253 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -286,7 +286,7 @@ Status RecordBatchReader::ReadAll(std::shared_ptr* table) { class SimpleRecordBatchReader : public RecordBatchReader { public: - SimpleRecordBatchReader(std::unique_ptr>> it, + SimpleRecordBatchReader(Iterator> it, std::shared_ptr schema) : schema_(schema), it_(std::move(it)) {} @@ -295,14 +295,14 @@ class SimpleRecordBatchReader : public RecordBatchReader { : schema_(schema), it_(MakeVectorIterator(batches)) {} Status ReadNext(std::shared_ptr* batch) override { - return it_->Next(batch); + return it_.Next(batch); } std::shared_ptr schema() const override { return schema_; } protected: std::shared_ptr schema_; - std::unique_ptr>> it_; + Iterator> it_; }; Status MakeRecordBatchReader(const std::vector>& batches, diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index c3b003edf68..1d8c5cd4884 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -167,12 +167,10 @@ class ARROW_EXPORT RecordBatch { }; /// \brief Abstract interface for reading stream of record batches -class ARROW_EXPORT RecordBatchReader - /// \cond FALSE - : public RecordBatchIterator -/// \endcond -{ // NOLINT whitespace/braces +class ARROW_EXPORT RecordBatchReader { public: + virtual ~RecordBatchReader() = default; + /// \return the shared schema of the record batches in the stream virtual std::shared_ptr schema() const = 0; @@ -183,7 +181,7 @@ class ARROW_EXPORT RecordBatchReader /// \return Status virtual Status ReadNext(std::shared_ptr* batch) = 0; - Status Next(std::shared_ptr* batch) final { return ReadNext(batch); } + Status Next(std::shared_ptr* batch) { return ReadNext(batch); } /// \brief Consume entire stream as a vector of record batches Status ReadAll(std::vector>* batches); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 41794d389c2..880d0f65cc9 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -555,98 +555,71 @@ Status Table::CombineChunks(MemoryPool* pool, std::shared_ptr
* out) const // ---------------------------------------------------------------------- // Convert a table to a sequence of record batches -class TableBatchReader::TableBatchReaderImpl { - public: - explicit TableBatchReaderImpl(const Table& table) - : table_(table), - column_data_(table.num_columns()), - chunk_numbers_(table.num_columns(), 0), - chunk_offsets_(table.num_columns(), 0), - absolute_row_position_(0), - max_chunksize_(std::numeric_limits::max()) { - for (int i = 0; i < table.num_columns(); ++i) { - column_data_[i] = table.column(i).get(); - } +TableBatchReader::TableBatchReader(const Table& table) + : table_(table), + column_data_(table.num_columns()), + chunk_numbers_(table.num_columns(), 0), + chunk_offsets_(table.num_columns(), 0), + absolute_row_position_(0), + max_chunksize_(std::numeric_limits::max()) { + for (int i = 0; i < table.num_columns(); ++i) { + column_data_[i] = table.column(i).get(); } +} - Status ReadNext(std::shared_ptr* out) { - if (absolute_row_position_ == table_.num_rows()) { - *out = nullptr; - return Status::OK(); - } - - // Determine the minimum contiguous slice across all columns - int64_t chunksize = std::min(table_.num_rows(), max_chunksize_); - std::vector chunks(table_.num_columns()); - for (int i = 0; i < table_.num_columns(); ++i) { - auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get(); - int64_t chunk_remaining = chunk->length() - chunk_offsets_[i]; - - if (chunk_remaining < chunksize) { - chunksize = chunk_remaining; - } - - chunks[i] = chunk; - } - - // Slice chunks and advance chunk index as appropriate - std::vector> batch_data(table_.num_columns()); - - for (int i = 0; i < table_.num_columns(); ++i) { - // Exhausted chunk - const Array* chunk = chunks[i]; - const int64_t offset = chunk_offsets_[i]; - std::shared_ptr slice_data; - if ((chunk->length() - offset) == chunksize) { - ++chunk_numbers_[i]; - chunk_offsets_[i] = 0; - if (offset > 0) { - // Need to slice - slice_data = chunk->Slice(offset, chunksize)->data(); - } else { - // No slice - slice_data = chunk->data(); - } - } else { - chunk_offsets_[i] += chunksize; - slice_data = chunk->Slice(offset, chunksize)->data(); - } - batch_data[i] = std::move(slice_data); - } +std::shared_ptr TableBatchReader::schema() const { return table_.schema(); } - absolute_row_position_ += chunksize; - *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); +void TableBatchReader::set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; } +Status TableBatchReader::ReadNext(std::shared_ptr* out) { + if (absolute_row_position_ == table_.num_rows()) { + *out = nullptr; return Status::OK(); } - std::shared_ptr schema() const { return table_.schema(); } + // Determine the minimum contiguous slice across all columns + int64_t chunksize = std::min(table_.num_rows(), max_chunksize_); + std::vector chunks(table_.num_columns()); + for (int i = 0; i < table_.num_columns(); ++i) { + auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get(); + int64_t chunk_remaining = chunk->length() - chunk_offsets_[i]; - void set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; } - - private: - const Table& table_; - std::vector column_data_; - std::vector chunk_numbers_; - std::vector chunk_offsets_; - int64_t absolute_row_position_; - int64_t max_chunksize_; -}; + if (chunk_remaining < chunksize) { + chunksize = chunk_remaining; + } -TableBatchReader::TableBatchReader(const Table& table) { - impl_.reset(new TableBatchReaderImpl(table)); -} + chunks[i] = chunk; + } -TableBatchReader::~TableBatchReader() {} + // Slice chunks and advance chunk index as appropriate + std::vector> batch_data(table_.num_columns()); -std::shared_ptr TableBatchReader::schema() const { return impl_->schema(); } + for (int i = 0; i < table_.num_columns(); ++i) { + // Exhausted chunk + const Array* chunk = chunks[i]; + const int64_t offset = chunk_offsets_[i]; + std::shared_ptr slice_data; + if ((chunk->length() - offset) == chunksize) { + ++chunk_numbers_[i]; + chunk_offsets_[i] = 0; + if (offset > 0) { + // Need to slice + slice_data = chunk->Slice(offset, chunksize)->data(); + } else { + // No slice + slice_data = chunk->data(); + } + } else { + chunk_offsets_[i] += chunksize; + slice_data = chunk->Slice(offset, chunksize)->data(); + } + batch_data[i] = std::move(slice_data); + } -void TableBatchReader::set_chunksize(int64_t chunksize) { - impl_->set_chunksize(chunksize); -} + absolute_row_position_ += chunksize; + *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); -Status TableBatchReader::ReadNext(std::shared_ptr* out) { - return impl_->ReadNext(out); + return Status::OK(); } } // namespace arrow diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 514b02769df..f3ae422b9fb 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -277,8 +277,6 @@ class ARROW_EXPORT Table { /// of the table's columns. class ARROW_EXPORT TableBatchReader : public RecordBatchReader { public: - ~TableBatchReader() override; - /// \brief Construct a TableBatchReader for the given table explicit TableBatchReader(const Table& table); @@ -293,8 +291,12 @@ class ARROW_EXPORT TableBatchReader : public RecordBatchReader { void set_chunksize(int64_t chunksize); private: - class TableBatchReaderImpl; - std::unique_ptr impl_; + const Table& table_; + std::vector column_data_; + std::vector chunk_numbers_; + std::vector chunk_offsets_; + int64_t absolute_row_position_; + int64_t max_chunksize_; }; /// \brief Construct table from multiple input tables. diff --git a/cpp/src/arrow/util/functional.h b/cpp/src/arrow/util/functional.h index 73542e30021..2b5a976a34b 100644 --- a/cpp/src/arrow/util/functional.h +++ b/cpp/src/arrow/util/functional.h @@ -66,5 +66,10 @@ struct call_traits { using argument_type = decltype(argument_type_impl(&std::decay::type::operator())); }; +template +struct type_constant { + using type = T; +}; + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 328b8e946bc..7005d2063fe 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include @@ -28,84 +29,141 @@ namespace arrow { +template +class Iterator; + +/// \brief IteratorEnd returns a reserved value which indicates the end of iteration. By +/// default this is NULLPTR since most iterators yield pointer types. Specialize/overload +/// if different end semantics are required. +template +static T IteratorEnd(internal::type_constant) { + return T(NULLPTR); +} + +/// Convenience function, overload the type_constant version above and not this one. +template +static T IteratorEnd() { + return IteratorEnd(internal::type_constant{}); +} + /// \brief A generic Iterator that can return errors template class Iterator { public: - virtual ~Iterator() = default; + /// \brief Iterator may be constructed from any type which has member function + /// Status HasNext::Next(T*); + template + explicit Iterator(HasNext has_next) + : ptr_(new HasNext(std::move(has_next)), Delete), next_(Next) {} + + Iterator() : ptr_(NULLPTR, NoopDelete) {} - /// \brief Return the next element of the sequence, nullptr when the - /// iteration is completed - virtual Status Next(T* out) = 0; + /// \brief Return the next element of the sequence, IteratorEnd() when the + /// iteration is completed. Calling this on a default constructed Iterator + /// will result in undefined behavior. + Status Next(T* out) { return next_(ptr_.get(), out); } /// Pass each element of the sequence to a visitor. Will return any error status /// returned by the visitor, terminating iteration. template Status Visit(Visitor&& visitor) { Status status; - T value; - for (;;) { + for (T value, end = IteratorEnd();;) { status = Next(&value); if (!status.ok()) return status; - if (value == NULLPTR) break; + if (value == end) break; ARROW_RETURN_NOT_OK(visitor(std::move(value))); } return status; } + + bool operator==(const Iterator& other) const { return ptr_ == other.ptr_; } + + explicit operator bool() const { return ptr_ == NULLPTR; } + + private: + template + static void Delete(void* ptr) { + delete static_cast(ptr); + } + + static void NoopDelete(void*) {} + + template + static Status Next(void* ptr, T* out) { + return static_cast(ptr)->Next(out); + } + + std::unique_ptr ptr_; + Status (*next_)(void*, T*) = NULLPTR; + + friend Iterator IteratorEnd(internal::type_constant) { + // end condition for an Iterator of Iterators is a default constructed (null) iterator + return Iterator(); + } }; -template -class FunctionIterator : public Iterator { +template +class PointerIterator { public: - using IteratorType = Iterator; - - explicit FunctionIterator(Fn fn) : fn_(std::move(fn)) {} + explicit PointerIterator(Ptr ptr) : ptr_(std::move(ptr)) {} - Status Next(T* out) override { return fn_(out); } + Status Next(T* out) { return ptr_->Next(out); } private: - Fn fn_; + Ptr ptr_; }; -template >::type> -std::unique_ptr> MakeFunctionIterator(Fn fn) { - return std::unique_ptr>( - new FunctionIterator(std::move(fn))); +/// \brief Construct an Iterator which dereferences a (possibly smart) pointer +/// to invoke its Next function +template ())>::type, + typename Fn = decltype(std::mem_fun(&Pointed::Next)), + typename T = typename std::remove_pointer< + internal::call_traits::argument_type<1, Fn>>::type> +Iterator MakePointerIterator(Ptr ptr) { + return Iterator(PointerIterator(std::move(ptr))); } -template -class EmptyIterator : public Iterator { +template +class FunctionIterator { public: - explicit EmptyIterator(Status s = Status::OK()) : status_(s) {} + explicit FunctionIterator(Fn fn) : fn_(std::move(fn)) {} - Status Next(T* out) override { - *out = NULLPTR; - return status_; - } + Status Next(T* out) { return fn_(out); } private: - Status status_; + Fn fn_; }; +/// \brief Construct an Iterator which invokes a callable on Next() +template >::type> +Iterator MakeFunctionIterator(Fn fn) { + return Iterator(FunctionIterator(std::move(fn))); +} + template -std::unique_ptr> MakeEmptyIterator() { - return std::unique_ptr>(new EmptyIterator()); +Iterator MakeEmptyIterator(Status s = Status::OK()) { + return MakeFunctionIterator([s](T* out) { + *out = IteratorEnd(); + return s; + }); } /// \brief Simple iterator which yields the elements of a std::vector template -class VectorIterator : public Iterator { +class VectorIterator { public: explicit VectorIterator(std::vector v) : elements_(std::move(v)) {} - Status Next(T* out) override { - *out = i_ == elements_.size() ? NULLPTR : std::move(elements_[i_++]); + Status Next(T* out) { + *out = i_ == elements_.size() ? IteratorEnd() : std::move(elements_[i_++]); return Status::OK(); } @@ -115,8 +173,8 @@ class VectorIterator : public Iterator { }; template -std::unique_ptr> MakeVectorIterator(std::vector v) { - return std::unique_ptr>(new VectorIterator(std::move(v))); +Iterator MakeVectorIterator(std::vector v) { + return Iterator(VectorIterator(std::move(v))); } /// \brief MapIterator takes ownership of an iterator and a function to apply @@ -125,33 +183,32 @@ template >::type, typename O = typename std::result_of::type> -class MapIterator : public Iterator { +class MapIterator { public: - explicit MapIterator(Fn map, std::unique_ptr> it) + explicit MapIterator(Fn map, Iterator it) : map_(std::move(map)), it_(std::move(it)) {} - Status Next(O* out) override { + Status Next(O* out) { I i; - ARROW_RETURN_NOT_OK(it_->Next(&i)); + ARROW_RETURN_NOT_OK(it_.Next(&i)); // Ensure loops exit. - *out = (i == NULLPTR) ? NULLPTR : map_(std::move(i)); + *out = i == IteratorEnd() ? IteratorEnd() : map_(std::move(i)); return Status::OK(); } private: Fn map_; - std::unique_ptr> it_; + Iterator it_; }; template >::type, typename O = typename std::result_of::type> -std::unique_ptr> MakeMapIterator(Fn map, std::unique_ptr> it) { - return std::unique_ptr>( - new MapIterator(std::move(map), std::move(it))); +Iterator MakeMapIterator(Fn map, Iterator it) { + return Iterator(MapIterator(std::move(map), std::move(it))); } /// \brief Like MapIterator, but where the function can fail. @@ -161,17 +218,16 @@ template < typename std::remove_pointer>::type, typename O = typename std::remove_pointer>::typ> -class MaybeMapIterator : public Iterator { +class MaybeMapIterator { public: - explicit MaybeMapIterator(Fn map, std::unique_ptr> it) - : map_(map), it_(std::move(it)) {} + explicit MaybeMapIterator(Fn map, Iterator it) : map_(map), it_(std::move(it)) {} - Status Next(O* out) override { + Status Next(O* out) { I i; - ARROW_RETURN_NOT_OK(it_->Next(&i)); - if (i == NULLPTR) { - *out = NULLPTR; + ARROW_RETURN_NOT_OK(it_.Next(&i)); + if (i == IteratorEnd()) { + *out = IteratorEnd(); return Status::OK(); } @@ -180,7 +236,7 @@ class MaybeMapIterator : public Iterator { private: Fn map_; - std::unique_ptr> it_; + Iterator it_; }; template < @@ -189,38 +245,36 @@ template < typename std::remove_pointer>::type, typename O = typename std::remove_pointer>::type> -std::unique_ptr> MakeMaybeMapIterator(Fn map, - std::unique_ptr> it) { - return std::unique_ptr>( - new MaybeMapIterator(std::move(map), std::move(it))); +Iterator MakeMaybeMapIterator(Fn map, Iterator it) { + return Iterator(MaybeMapIterator(std::move(map), std::move(it))); } /// \brief FlattenIterator takes an iterator generating iterators and yields a /// unified iterator that flattens/concatenates in a single stream. -template >> -class FlattenIterator : public Iterator { +template +class FlattenIterator { public: - explicit FlattenIterator(std::unique_ptr> it) : parent_(std::move(it)) {} + explicit FlattenIterator(Iterator> it) : parent_(std::move(it)) {} - Status Next(T* out) override { + Status Next(T* out) { if (done_) { - *out = NULLPTR; + *out = IteratorEnd(); return Status::OK(); } - if (child_ == NULLPTR) { + if (child_ == IteratorEnd>()) { // Pop from parent's iterator. - ARROW_RETURN_NOT_OK(parent_->Next(&child_)); + ARROW_RETURN_NOT_OK(parent_.Next(&child_)); // Check if final iteration reached. - done_ = (child_ == NULLPTR); + done_ = (child_ == IteratorEnd>()); return Next(out); } // Pop from child_ and lookout for depletion. - ARROW_RETURN_NOT_OK(child_->Next(out)); - if (*out == NULLPTR) { + ARROW_RETURN_NOT_OK(child_.Next(out)); + if (*out == IteratorEnd()) { // Reset state such that we pop from parent on the recursive call - child_ = NULLPTR; + child_ = IteratorEnd>(); return Next(out); } @@ -228,17 +282,16 @@ class FlattenIterator : public Iterator { } private: - std::unique_ptr> parent_; - std::unique_ptr> child_ = NULLPTR; + Iterator> parent_; + Iterator child_ = IteratorEnd>(); // The usage of done_ could be avoided by setting parent_ to null, but this // would hamper debugging. bool done_ = false; }; template -std::unique_ptr> MakeFlattenIterator( - std::unique_ptr>>> it) { - return std::unique_ptr>(new FlattenIterator(std::move(it))); +Iterator MakeFlattenIterator(Iterator> it) { + return Iterator(FlattenIterator(std::move(it))); } } // namespace arrow diff --git a/cpp/src/arrow/util/iterator_test.cc b/cpp/src/arrow/util/iterator_test.cc index 4843d39cea9..edb85c1db43 100644 --- a/cpp/src/arrow/util/iterator_test.cc +++ b/cpp/src/arrow/util/iterator_test.cc @@ -26,7 +26,7 @@ namespace arrow { template -std::vector IteratorToVector(std::unique_ptr>& iterator) { +std::vector IteratorToVector(Iterator iterator) { std::vector out; auto fn = [&out](T value) -> Status { @@ -34,91 +34,104 @@ std::vector IteratorToVector(std::unique_ptr>& iterator) { return Status::OK(); }; - ARROW_EXPECT_OK(iterator->Visit(fn)); + ARROW_EXPECT_OK(iterator.Visit(fn)); return out; } -#define EmptyIt MakeEmptyIterator -#define VectorIt MakeVectorIterator -#define FlattenIt MakeFlattenIterator +struct TestInt { + TestInt() : value(-999) {} + TestInt(int i) : value(i) {} // NOLINT runtime/explicit + int value; -// Iterators works on pointers -static const int one = 1; -static const int two = 2; -static const int three = 3; + bool operator==(const TestInt& other) const { return value == other.value; } + + friend TestInt IteratorEnd(internal::type_constant) { return TestInt(); } +}; template -void AssertIteratorMatch(std::vector expected, std::unique_ptr> actual) { - EXPECT_EQ(expected, IteratorToVector(actual)); +inline Iterator EmptyIt() { + return MakeEmptyIterator(); +} + +inline Iterator VectorIt(std::vector v) { + return MakeVectorIterator(std::move(v)); } template -void AssertIteratorNoMatch(std::vector expected, std::unique_ptr> actual) { - EXPECT_NE(expected, IteratorToVector(actual)); +inline Iterator VectorIt(std::vector v) { + return MakeVectorIterator(std::move(v)); } -TEST(TestEmptyIterator, Basic) { AssertIteratorMatch({}, EmptyIt()); } +template +inline Iterator FlattenIt(Iterator> its) { + return MakeFlattenIterator(std::move(its)); +} -TEST(TestVectorIterator, Basic) { - std::vector input{&one, &two, &three}; +template +void AssertIteratorMatch(std::vector expected, Iterator actual) { + EXPECT_EQ(expected, IteratorToVector(std::move(actual))); +} - AssertIteratorMatch({}, VectorIt(std::vector{})); - AssertIteratorMatch({&one, &two, &three}, VectorIt(input)); +template +void AssertIteratorNoMatch(std::vector expected, Iterator actual) { + EXPECT_NE(expected, IteratorToVector(std::move(actual))); +} - AssertIteratorNoMatch({&one}, VectorIt(std::vector{})); - AssertIteratorNoMatch({}, VectorIt(input)); - AssertIteratorNoMatch({&one, &two, &two}, VectorIt(input)); - AssertIteratorNoMatch({&one, &two, &three, &one}, VectorIt(input)); +TEST(TestEmptyIterator, Basic) { AssertIteratorMatch({}, EmptyIt()); } + +TEST(TestVectorIterator, Basic) { + AssertIteratorMatch({}, VectorIt({})); + AssertIteratorMatch({1, 2, 3}, VectorIt({1, 2, 3})); + + AssertIteratorNoMatch({1}, VectorIt({})); + AssertIteratorNoMatch({}, VectorIt({1, 2, 3})); + AssertIteratorNoMatch({1, 2, 2}, VectorIt({1, 2, 3})); + AssertIteratorNoMatch({1, 2, 3, 1}, VectorIt({1, 2, 3})); } TEST(FlattenVectorIterator, Basic) { - std::vector input{&one, &two, &three}; - - // Flatten expects to consume Iterator>> - AssertIteratorMatch({}, FlattenIt(EmptyIt>>())); - - // unique_ptr and initializer lists is a nono - std::vector>> ok{3}; - ok[0] = VectorIt(std::vector{&one}); - ok[1] = VectorIt(std::vector{&two}); - ok[2] = VectorIt(std::vector{&three}); - AssertIteratorMatch(input, FlattenIt(VectorIt(std::move(ok)))); - - std::vector>> not_enough{2}; - not_enough[0] = VectorIt(std::vector{&one}); - not_enough[1] = VectorIt(std::vector{&two}); - AssertIteratorNoMatch(input, FlattenIt(VectorIt(std::move(not_enough)))); - - std::vector>> too_much{4}; - too_much[0] = VectorIt(std::vector{&one}); - too_much[1] = VectorIt(std::vector{&two}); - too_much[2] = VectorIt(std::vector{&three}); - too_much[3] = VectorIt(std::vector{&two}); - AssertIteratorNoMatch(input, FlattenIt(VectorIt(std::move(too_much)))); + // Flatten expects to consume Iterator> + AssertIteratorMatch({}, FlattenIt(EmptyIt>())); + + std::vector> ok; + ok.push_back(VectorIt({1})); + ok.push_back(VectorIt({2})); + ok.push_back(VectorIt({3})); + AssertIteratorMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(ok)))); + + std::vector> not_enough; + not_enough.push_back(VectorIt({1})); + not_enough.push_back(VectorIt({2})); + AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(not_enough)))); + + std::vector> too_much; + too_much.push_back(VectorIt({1})); + too_much.push_back(VectorIt({2})); + too_much.push_back(VectorIt({3})); + too_much.push_back(VectorIt({2})); + AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(too_much)))); } -template -std::unique_ptr> Join(T a, T b) { - std::vector>> joined{2}; - joined[0] = VectorIt(std::vector{a}); - joined[1] = VectorIt(std::vector{b}); +Iterator Join(TestInt a, TestInt b) { + std::vector> joined{2}; + joined[0] = VectorIt({a}); + joined[1] = VectorIt({b}); return FlattenIt(VectorIt(std::move(joined))); } -template -std::unique_ptr> Join(T a, std::unique_ptr> b) { - std::vector>> joined{2}; - joined[0] = VectorIt(std::vector{a}); +Iterator Join(TestInt a, Iterator b) { + std::vector> joined{2}; + joined[0] = VectorIt(std::vector{a}); joined[1] = std::move(b); return FlattenIt(VectorIt(std::move(joined))); } TEST(FlattenVectorIterator, Pyramid) { - auto it = Join(&one, Join(&two, Join(&two, Join(&three, Join(&three, &three))))); - AssertIteratorMatch({&one, &two, &two, &three, &three, &three}, std::move(it)); + auto it = Join(1, Join(2, Join(2, Join(3, Join(3, 3))))); + AssertIteratorMatch({1, 2, 2, 3, 3, 3}, std::move(it)); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 1116a1b70d1..adb7e175e5a 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -352,30 +352,6 @@ class ColumnChunkReaderImpl : public ColumnChunkReader { int row_group_index_; }; -struct RowGroupReader::Iterator : ::arrow::TableBatchReader { - explicit Iterator(const std::shared_ptr
& table) - : TableBatchReader(*table), table_(table) {} - // TableBatchReader does not take ownership of table - std::shared_ptr
table_; -}; - -Status RowGroupReader::MakeIterator( - std::unique_ptr<::arrow::RecordBatchIterator>* batches) { - std::shared_ptr<::arrow::Table> table; - RETURN_NOT_OK(ReadTable(&table)); - batches->reset(new Iterator(table)); - return Status::OK(); -} - -Status RowGroupReader::MakeIterator( - const std::vector& column_indices, - std::unique_ptr<::arrow::RecordBatchIterator>* batches) { - std::shared_ptr<::arrow::Table> table; - RETURN_NOT_OK(ReadTable(column_indices, &table)); - batches->reset(new Iterator(table)); - return Status::OK(); -} - class RowGroupReaderImpl : public RowGroupReader { public: RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index) diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index ca8ff1c36e5..a9fb978f686 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -232,11 +232,6 @@ class RowGroupReader { std::shared_ptr<::arrow::Table>* out) = 0; virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0; - ::arrow::Status MakeIterator(std::unique_ptr<::arrow::RecordBatchIterator>* batches); - - ::arrow::Status MakeIterator(const std::vector& column_indices, - std::unique_ptr<::arrow::RecordBatchIterator>* batches); - private: struct Iterator; }; From f73f4d519befe2049d2845cb732d792eb9214169 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 16 Sep 2019 12:58:34 -0400 Subject: [PATCH 2/6] address review comments --- cpp/src/arrow/util/functional.h | 6 +++++ cpp/src/arrow/util/iterator.h | 40 +++++++++++++++++++++------------ 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/util/functional.h b/cpp/src/arrow/util/functional.h index 2b5a976a34b..27ef43c2b72 100644 --- a/cpp/src/arrow/util/functional.h +++ b/cpp/src/arrow/util/functional.h @@ -71,5 +71,11 @@ struct type_constant { using type = T; }; +template +constexpr type_constant>::type> +member_function_argument_type(R (T::*fn)(A...)) { + return {}; +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 7005d2063fe..3973c997cad 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -33,7 +33,7 @@ template class Iterator; /// \brief IteratorEnd returns a reserved value which indicates the end of iteration. By -/// default this is NULLPTR since most iterators yield pointer types. Specialize/overload +/// default this is NULLPTR since most iterators yield pointer types. Overload /// if different end semantics are required. template static T IteratorEnd(internal::type_constant) { @@ -50,11 +50,11 @@ static T IteratorEnd() { template class Iterator { public: - /// \brief Iterator may be constructed from any type which has member function - /// Status HasNext::Next(T*); - template - explicit Iterator(HasNext has_next) - : ptr_(new HasNext(std::move(has_next)), Delete), next_(Next) {} + /// \brief Iterator may be constructed from any type which has a member function + /// with signature Status Next(T*); + template + explicit Iterator(Wrapped has_next) + : ptr_(new Wrapped(std::move(has_next)), Delete), next_(Next) {} Iterator() : ptr_(NULLPTR, NoopDelete) {} @@ -82,28 +82,41 @@ class Iterator { return status; } + /// Iterators will only compare equal if they are both null bool operator==(const Iterator& other) const { return ptr_ == other.ptr_; } - explicit operator bool() const { return ptr_ == NULLPTR; } + explicit operator bool() const { return ptr_ != NULLPTR; } private: + /// Implementation of deleter for ptr_: Casts from void* to the wrapped type and + /// deletes that. template static void Delete(void* ptr) { delete static_cast(ptr); } + /// Noop delete, used only by the default constructed case where ptr_ is null and + /// nothing must be deleted. static void NoopDelete(void*) {} + /// Implementation of Next: Casts from void* to the wrapped type and invokes that + /// type's Next member function. template static Status Next(void* ptr, T* out) { return static_cast(ptr)->Next(out); } + /// ptr_ is a unique_ptr to void with a custom deleter: a function pointer which first + /// casts from void* to a pointer to the wrapped type then deletes that. std::unique_ptr ptr_; + + /// next_ is a function pointer which first casts from void* to a pointer to the wrapped + /// type then invokes its Next member function. Status (*next_)(void*, T*) = NULLPTR; friend Iterator IteratorEnd(internal::type_constant) { - // end condition for an Iterator of Iterators is a default constructed (null) iterator + // The end condition for an Iterator of Iterators is a default constructed (null) + // Iterator. return Iterator(); } }; @@ -123,9 +136,8 @@ class PointerIterator { /// to invoke its Next function template ())>::type, - typename Fn = decltype(std::mem_fun(&Pointed::Next)), - typename T = typename std::remove_pointer< - internal::call_traits::argument_type<1, Fn>>::type> + typename T = typename decltype( + internal::member_function_argument_type<0>(&Pointed::Next))::type> Iterator MakePointerIterator(Ptr ptr) { return Iterator(PointerIterator(std::move(ptr))); } @@ -149,10 +161,10 @@ Iterator MakeFunctionIterator(Fn fn) { } template -Iterator MakeEmptyIterator(Status s = Status::OK()) { - return MakeFunctionIterator([s](T* out) { +Iterator MakeEmptyIterator() { + return MakeFunctionIterator([](T* out) { *out = IteratorEnd(); - return s; + return Status::OK(); }); } From a0129c972cec6fb7320636fc4cb402ad636c0c9f Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 16 Sep 2019 13:01:05 -0400 Subject: [PATCH 3/6] add comment to constructor --- cpp/src/arrow/util/iterator.h | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 3973c997cad..100f8d53dbc 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -52,6 +52,9 @@ class Iterator { public: /// \brief Iterator may be constructed from any type which has a member function /// with signature Status Next(T*); + /// The argument is moved or copied to the heap and kept in a unique_ptr. Only + /// its destructor and its Next method (which are stored in function pointers) are + /// referenced after construction. template explicit Iterator(Wrapped has_next) : ptr_(new Wrapped(std::move(has_next)), Delete), next_(Next) {} From 01445d8a9d71c72a6d02f6335b5f0d750d27c125 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 16 Sep 2019 13:02:07 -0400 Subject: [PATCH 4/6] remove --- cpp/src/arrow/util/iterator.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 100f8d53dbc..2e17da8a51a 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -17,7 +17,6 @@ #pragma once -#include #include #include #include From b57168782cd735b1d34a78593c5da7caf1c5ba72 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 16 Sep 2019 13:06:20 -0400 Subject: [PATCH 5/6] re-add remove_pointer --- cpp/src/arrow/util/iterator.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 2e17da8a51a..2a2da4ecc43 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -138,8 +138,8 @@ class PointerIterator { /// to invoke its Next function template ())>::type, - typename T = typename decltype( - internal::member_function_argument_type<0>(&Pointed::Next))::type> + typename T = typename std::remove_pointer(&Pointed::Next))::type>::type> Iterator MakePointerIterator(Ptr ptr) { return Iterator(PointerIterator(std::move(ptr))); } From 903ce2947258d5e9cae3b3c203daa4b55391b754 Mon Sep 17 00:00:00 2001 From: Benjamin Kietzman Date: Mon, 16 Sep 2019 14:05:17 -0400 Subject: [PATCH 6/6] revert change to WriteRecordBatchReader, refactor IteratorEnd to a trait class --- cpp/src/arrow/dataset/file_parquet_test.cc | 22 ++++----- cpp/src/arrow/record_batch.cc | 1 + cpp/src/arrow/record_batch.h | 2 +- cpp/src/arrow/util/iterator.h | 57 ++++++++++------------ cpp/src/arrow/util/iterator_test.cc | 5 +- cpp/src/parquet/arrow/reader.h | 1 - 6 files changed, 44 insertions(+), 44 deletions(-) diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index e4c2ccd080c..8bb333bb718 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -60,8 +60,8 @@ Status WriteRecordBatch(const RecordBatch& batch, FileWriter* writer) { return Status::OK(); } -Status WriteRecordBatchReader(RecordBatchReader&& reader, FileWriter* writer) { - auto schema = reader.schema(); +Status WriteRecordBatchReader(RecordBatchReader* reader, FileWriter* writer) { + auto schema = reader->schema(); if (!schema->Equals(*writer->schema(), false)) { return Status::Invalid("RecordBatch schema does not match this writer's. batch:'", @@ -69,34 +69,34 @@ Status WriteRecordBatchReader(RecordBatchReader&& reader, FileWriter* writer) { "'"); } - return MakePointerIterator(&reader).Visit( + return MakePointerIterator(reader).Visit( [&](std::shared_ptr batch) -> Status { return WriteRecordBatch(*batch, writer); }); } Status WriteRecordBatchReader( - RecordBatchReader&& reader, MemoryPool* pool, + RecordBatchReader* reader, MemoryPool* pool, const std::shared_ptr& sink, const std::shared_ptr& properties = default_writer_properties(), const std::shared_ptr& arrow_properties = default_arrow_writer_properties()) { std::unique_ptr writer; - RETURN_NOT_OK(FileWriter::Open(*reader.schema(), pool, sink, properties, + RETURN_NOT_OK(FileWriter::Open(*reader->schema(), pool, sink, properties, arrow_properties, &writer)); - RETURN_NOT_OK(WriteRecordBatchReader(std::move(reader), writer.get())); + RETURN_NOT_OK(WriteRecordBatchReader(reader, writer.get())); return writer->Close(); } class ArrowParquetWriterMixin : public ::testing::Test { public: - std::shared_ptr Write(RecordBatchReader&& reader) { + std::shared_ptr Write(RecordBatchReader* reader) { auto pool = ::arrow::default_memory_pool(); std::shared_ptr out; auto sink = CreateOutputStream(pool); - ARROW_EXPECT_OK(WriteRecordBatchReader(std::move(reader), pool, sink)); + ARROW_EXPECT_OK(WriteRecordBatchReader(reader, pool, sink)); ARROW_EXPECT_OK(sink->Finish(&out)); return out; @@ -117,8 +117,8 @@ class ArrowParquetWriterMixin : public ::testing::Test { class ParquetBufferFixtureMixin : public ArrowParquetWriterMixin { public: - std::unique_ptr GetFileSource(RecordBatchReader&& reader) { - auto buffer = Write(std::move(reader)); + std::unique_ptr GetFileSource(RecordBatchReader* reader) { + auto buffer = Write(reader); return internal::make_unique(std::move(buffer)); } @@ -154,7 +154,7 @@ class TestParquetFileFormat : public ParquetBufferFixtureMixin { TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto reader = GetRecordBatchReader(); - auto source = GetFileSource(std::move(*reader)); + auto source = GetFileSource(reader.get()); auto fragment = std::make_shared(*source, opts_); ScanTaskIterator it; diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index c81f6d6e253..24d3200df63 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -29,6 +29,7 @@ #include "arrow/table.h" #include "arrow/type.h" #include "arrow/util/atomic_shared_ptr.h" +#include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/stl.h" diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index 1d8c5cd4884..90350bed6fc 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -22,8 +22,8 @@ #include #include +#include "arrow/status.h" #include "arrow/type_fwd.h" -#include "arrow/util/iterator.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 2a2da4ecc43..38ad382eb70 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -31,19 +31,13 @@ namespace arrow { template class Iterator; -/// \brief IteratorEnd returns a reserved value which indicates the end of iteration. By -/// default this is NULLPTR since most iterators yield pointer types. Overload -/// if different end semantics are required. template -static T IteratorEnd(internal::type_constant) { - return T(NULLPTR); -} - -/// Convenience function, overload the type_constant version above and not this one. -template -static T IteratorEnd() { - return IteratorEnd(internal::type_constant{}); -} +struct IterationTraits { + /// \brief a reserved value which indicates the end of iteration. By + /// default this is NULLPTR since most iterators yield pointer types. + /// Specialize IterationTraits if different end semantics are required. + static T End() { return T(NULLPTR); } +}; /// \brief A generic Iterator that can return errors template @@ -60,7 +54,7 @@ class Iterator { Iterator() : ptr_(NULLPTR, NoopDelete) {} - /// \brief Return the next element of the sequence, IteratorEnd() when the + /// \brief Return the next element of the sequence, IterationTraits::End() when the /// iteration is completed. Calling this on a default constructed Iterator /// will result in undefined behavior. Status Next(T* out) { return next_(ptr_.get(), out); } @@ -71,7 +65,7 @@ class Iterator { Status Visit(Visitor&& visitor) { Status status; - for (T value, end = IteratorEnd();;) { + for (T value, end = IterationTraits::End();;) { status = Next(&value); if (!status.ok()) return status; @@ -115,12 +109,13 @@ class Iterator { /// next_ is a function pointer which first casts from void* to a pointer to the wrapped /// type then invokes its Next member function. Status (*next_)(void*, T*) = NULLPTR; +}; - friend Iterator IteratorEnd(internal::type_constant) { - // The end condition for an Iterator of Iterators is a default constructed (null) - // Iterator. - return Iterator(); - } +template +struct IterationTraits> { + // The end condition for an Iterator of Iterators is a default constructed (null) + // Iterator. + static Iterator End() { return Iterator(); } }; template @@ -165,7 +160,7 @@ Iterator MakeFunctionIterator(Fn fn) { template Iterator MakeEmptyIterator() { return MakeFunctionIterator([](T* out) { - *out = IteratorEnd(); + *out = IterationTraits::End(); return Status::OK(); }); } @@ -177,7 +172,8 @@ class VectorIterator { explicit VectorIterator(std::vector v) : elements_(std::move(v)) {} Status Next(T* out) { - *out = i_ == elements_.size() ? IteratorEnd() : std::move(elements_[i_++]); + *out = + i_ == elements_.size() ? IterationTraits::End() : std::move(elements_[i_++]); return Status::OK(); } @@ -207,7 +203,8 @@ class MapIterator { ARROW_RETURN_NOT_OK(it_.Next(&i)); // Ensure loops exit. - *out = i == IteratorEnd() ? IteratorEnd() : map_(std::move(i)); + *out = + i == IterationTraits::End() ? IterationTraits::End() : map_(std::move(i)); return Status::OK(); } @@ -240,8 +237,8 @@ class MaybeMapIterator { I i; ARROW_RETURN_NOT_OK(it_.Next(&i)); - if (i == IteratorEnd()) { - *out = IteratorEnd(); + if (i == IterationTraits::End()) { + *out = IterationTraits::End(); return Status::OK(); } @@ -272,23 +269,23 @@ class FlattenIterator { Status Next(T* out) { if (done_) { - *out = IteratorEnd(); + *out = IterationTraits::End(); return Status::OK(); } - if (child_ == IteratorEnd>()) { + if (child_ == IterationTraits>::End()) { // Pop from parent's iterator. ARROW_RETURN_NOT_OK(parent_.Next(&child_)); // Check if final iteration reached. - done_ = (child_ == IteratorEnd>()); + done_ = (child_ == IterationTraits>::End()); return Next(out); } // Pop from child_ and lookout for depletion. ARROW_RETURN_NOT_OK(child_.Next(out)); - if (*out == IteratorEnd()) { + if (*out == IterationTraits::End()) { // Reset state such that we pop from parent on the recursive call - child_ = IteratorEnd>(); + child_ = IterationTraits>::End(); return Next(out); } @@ -297,7 +294,7 @@ class FlattenIterator { private: Iterator> parent_; - Iterator child_ = IteratorEnd>(); + Iterator child_ = IterationTraits>::End(); // The usage of done_ could be avoided by setting parent_ to null, but this // would hamper debugging. bool done_ = false; diff --git a/cpp/src/arrow/util/iterator_test.cc b/cpp/src/arrow/util/iterator_test.cc index edb85c1db43..15f8044bd1a 100644 --- a/cpp/src/arrow/util/iterator_test.cc +++ b/cpp/src/arrow/util/iterator_test.cc @@ -45,8 +45,11 @@ struct TestInt { int value; bool operator==(const TestInt& other) const { return value == other.value; } +}; - friend TestInt IteratorEnd(internal::type_constant) { return TestInt(); } +template <> +struct IterationTraits { + static TestInt End() { return TestInt(); } }; template diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index a9fb978f686..17ebf925bb9 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -22,7 +22,6 @@ #include #include -#include "arrow/util/iterator.h" #include "parquet/file_reader.h" #include "parquet/platform.h" #include "parquet/properties.h"