diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 66e25d404d0..548f90612b3 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -31,7 +31,7 @@ SimpleDataFragment::SimpleDataFragment( : record_batches_(std::move(record_batches)) {} Status SimpleDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { + ScanTaskIterator* out) { // Make an explicit copy of record_batches_ to ensure Scan can be called // multiple times. auto it = MakeVectorIterator(record_batches_); diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index a30c977a72c..4e75112ad4a 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -39,7 +39,7 @@ class ARROW_DS_EXPORT DataFragment { /// \brief Scan returns an iterator of ScanTasks, each of which yields /// RecordBatches from this DataFragment. virtual Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) = 0; + ScanTaskIterator* out) = 0; /// \brief Return true if the fragment can benefit from parallel /// scanning @@ -60,8 +60,7 @@ class ARROW_DS_EXPORT SimpleDataFragment : public DataFragment { public: explicit SimpleDataFragment(std::vector> record_batches); - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::shared_ptr scan_context, ScanTaskIterator* out) override; bool splittable() const override { return false; } @@ -78,8 +77,7 @@ class ARROW_DS_EXPORT DataSource { public: /// \brief GetFragments returns an iterator of DataFragments. The ScanOptions /// controls filtering and schema inference. - virtual std::unique_ptr GetFragments( - std::shared_ptr options) = 0; + virtual DataFragmentIterator GetFragments(std::shared_ptr options) = 0; virtual std::string type() const = 0; @@ -92,8 +90,7 @@ class ARROW_DS_EXPORT SimpleDataSource : public DataSource { explicit SimpleDataSource(DataFragmentVector fragments) : fragments_(std::move(fragments)) {} - std::unique_ptr GetFragments( - std::shared_ptr options) override { + DataFragmentIterator GetFragments(std::shared_ptr options) override { return MakeVectorIterator(fragments_); } diff --git a/cpp/src/arrow/dataset/dataset_internal.h b/cpp/src/arrow/dataset/dataset_internal.h index 4dd42b7a865..0e7b3226706 100644 --- a/cpp/src/arrow/dataset/dataset_internal.h +++ b/cpp/src/arrow/dataset/dataset_internal.h @@ -163,16 +163,15 @@ constexpr int RecordBatchProjector::kNoMatch; class ProjectedRecordBatchReader : public RecordBatchReader { public: static Status Make(MemoryPool* pool, RecordBatchProjector projector, - std::unique_ptr wrapped, - std::unique_ptr* out) { - out->reset( - new ProjectedRecordBatchReader(pool, std::move(projector), std::move(wrapped))); + RecordBatchIterator wrapped, RecordBatchIterator* out) { + *out = RecordBatchIterator( + ProjectedRecordBatchReader(pool, std::move(projector), std::move(wrapped))); return Status::OK(); } Status ReadNext(std::shared_ptr* out) override { std::shared_ptr rb; - RETURN_NOT_OK(wrapped_->Next(&rb)); + RETURN_NOT_OK(wrapped_.Next(&rb)); if (rb == nullptr) { *out = nullptr; return Status::OK(); @@ -185,11 +184,11 @@ class ProjectedRecordBatchReader : public RecordBatchReader { private: ProjectedRecordBatchReader(MemoryPool* pool, RecordBatchProjector projector, - std::unique_ptr wrapped) + RecordBatchIterator wrapped) : projector_(std::move(projector)), wrapped_(std::move(wrapped)) {} RecordBatchProjector projector_; - std::unique_ptr wrapped_; + RecordBatchIterator wrapped_; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index acc45b8bf9a..20f4944c830 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -41,7 +41,7 @@ Status FileSource::Open(std::shared_ptr* out) const } Status FileBasedDataFragment::Scan(std::shared_ptr scan_context, - std::unique_ptr* out) { + ScanTaskIterator* out) { return format_->ScanFile(source_, scan_options_, scan_context, out); } @@ -75,7 +75,7 @@ Status FileSystemBasedDataSource::Make(fs::FileSystem* filesystem, return Status::OK(); } -std::unique_ptr FileSystemBasedDataSource::GetFragments( +DataFragmentIterator FileSystemBasedDataSource::GetFragments( std::shared_ptr options) { struct Impl : DataFragmentIterator { Impl(fs::FileSystem* filesystem, std::shared_ptr format, @@ -105,7 +105,7 @@ std::unique_ptr FileSystemBasedDataSource::GetFragments( std::vector stats_; }; - return internal::make_unique(filesystem_, format_, options, stats_); + return DataFragmentIterator(Impl(filesystem_, format_, options, stats_)); } } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index 2053342dcf7..f3add99b409 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -131,7 +131,7 @@ class ARROW_DS_EXPORT FileFormat { virtual Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const = 0; + ScanTaskIterator* out) const = 0; /// \brief Open a fragment virtual Status MakeFragment(const FileSource& location, @@ -148,8 +148,7 @@ class ARROW_DS_EXPORT FileBasedDataFragment : public DataFragment { format_(std::move(format)), scan_options_(std::move(scan_options)) {} - Status Scan(std::shared_ptr scan_context, - std::unique_ptr* out) override; + Status Scan(std::shared_ptr scan_context, ScanTaskIterator* out) override; const FileSource& source() const { return source_; } std::shared_ptr format() const { return format_; } @@ -176,8 +175,7 @@ class ARROW_DS_EXPORT FileSystemBasedDataSource : public DataSource { std::string type() const override { return "directory"; } - std::unique_ptr GetFragments( - std::shared_ptr options) override; + DataFragmentIterator GetFragments(std::shared_ptr options) override; protected: FileSystemBasedDataSource(fs::FileSystem* filesystem, const fs::Selector& selector, diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 6b02085a53c..ea669713b2a 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -62,7 +62,7 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_feather.h b/cpp/src/arrow/dataset/file_feather.h index c91585586ad..a9d6b025859 100644 --- a/cpp/src/arrow/dataset/file_feather.h +++ b/cpp/src/arrow/dataset/file_feather.h @@ -49,7 +49,7 @@ class ARROW_DS_EXPORT FeatherFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_json.h b/cpp/src/arrow/dataset/file_json.h index e24cc0d9c45..d3023693c7a 100644 --- a/cpp/src/arrow/dataset/file_json.h +++ b/cpp/src/arrow/dataset/file_json.h @@ -54,7 +54,7 @@ class ARROW_DS_EXPORT JsonFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; }; } // namespace dataset diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index b761ab24814..77c3a71e2d3 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -54,8 +54,8 @@ class ParquetScanTask : public ScanTask { return Status::OK(); } - std::unique_ptr Scan() override { - return std::move(record_batch_reader_); + RecordBatchIterator Scan() { + return MakePointerIterator(std::move(record_batch_reader_)); } private: @@ -107,11 +107,11 @@ class ParquetRowGroupPartitioner { int num_row_groups_; }; -class ParquetScanTaskIterator : public ScanTaskIterator { +class ParquetScanTaskIterator { public: static Status Make(std::shared_ptr options, std::shared_ptr context, ParquetFileReaderPtr reader, - std::unique_ptr* out) { + ScanTaskIterator* out) { auto metadata = reader->metadata(); std::vector columns_projection; @@ -121,13 +121,13 @@ class ParquetScanTaskIterator : public ScanTaskIterator { RETURN_NOT_OK(parquet::arrow::FileReader::Make(context->pool, std::move(reader), &arrow_reader)); - out->reset(new ParquetScanTaskIterator(columns_projection, metadata, - std::move(arrow_reader))); + *out = ScanTaskIterator( + ParquetScanTaskIterator(columns_projection, metadata, std::move(arrow_reader))); return Status::OK(); } - Status Next(ScanTaskPtr* task) override { + Status Next(ScanTaskPtr* task) { auto partition = partitionner_.Next(); // Iteration is done. @@ -166,7 +166,7 @@ class ParquetScanTaskIterator : public ScanTaskIterator { Status ParquetFileFormat::ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const { + ScanTaskIterator* out) const { std::shared_ptr input; RETURN_NOT_OK(source.Open(&input)); diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index fd462588317..a04f4596f87 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -50,7 +50,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// \brief Open a file for scanning Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override; + ScanTaskIterator* out) const override; Status MakeFragment(const FileSource& source, std::shared_ptr opts, std::unique_ptr* out) override; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 058a49d6796..8bb333bb718 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -69,9 +69,10 @@ Status WriteRecordBatchReader(RecordBatchReader* reader, FileWriter* writer) { "'"); } - return reader->Visit([&](std::shared_ptr batch) -> Status { - return WriteRecordBatch(*batch, writer); - }); + return MakePointerIterator(reader).Visit( + [&](std::shared_ptr batch) -> Status { + return WriteRecordBatch(*batch, writer); + }); } Status WriteRecordBatchReader( @@ -156,13 +157,13 @@ TEST_F(TestParquetFileFormat, ScanRecordBatchReader) { auto source = GetFileSource(reader.get()); auto fragment = std::make_shared(*source, opts_); - std::unique_ptr it; + ScanTaskIterator it; ASSERT_OK(fragment->Scan(ctx_, &it)); int64_t row_count = 0; - ASSERT_OK(it->Visit([&row_count](std::unique_ptr task) -> Status { + ASSERT_OK(it.Visit([&row_count](std::unique_ptr task) -> Status { auto batch_it = task->Scan(); - return batch_it->Visit([&row_count](std::shared_ptr batch) -> Status { + return batch_it.Visit([&row_count](std::shared_ptr batch) -> Status { row_count += batch->num_rows(); return Status::OK(); }); diff --git a/cpp/src/arrow/dataset/partition.h b/cpp/src/arrow/dataset/partition.h index 28c55adcc10..d6adf157432 100644 --- a/cpp/src/arrow/dataset/partition.h +++ b/cpp/src/arrow/dataset/partition.h @@ -151,8 +151,7 @@ class ARROW_DS_EXPORT Partition : public DataSource { /// e.g. for the top-level partitioned source container virtual const PartitionKey* key() const = 0; - virtual std::unique_ptr GetFragments( - const Selector& selector) = 0; + virtual DataFragmentIterator GetFragments(const Selector& selector) = 0; }; /// \brief Simple implementation of Partition, which consists of a @@ -176,8 +175,7 @@ class ARROW_DS_EXPORT SimplePartition : public Partition { const PartitionVector& subpartitions() const { return subpartitions_; } const DataFragmentVector& data_fragments() const { return data_fragments_; } - std::unique_ptr GetFragments( - const FilterVector& filters) override; + DataFragmentIterator GetFragments(const FilterVector& filters) override; private: std::unique_ptr key_; @@ -200,13 +198,12 @@ class ARROW_DS_EXPORT LazyPartition : public Partition { public: const PartitionKey* key() const override; - std::unique_ptr GetFragments( - const& DataSelector selector) override; + DataFragmentIterator GetFragments(const& DataSelector selector) override; // TODO(wesm): Iterate over subpartitions protected: - std::unique_ptr partition_iter_; + PartitionIterator partition_iter_; // By default, once this source is consumed using GetFragments, it // cannot be consumed again. By setting this to true, we cache diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 4ad0f6a537c..1e5b8a8f2f7 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -24,21 +24,20 @@ namespace arrow { namespace dataset { -std::unique_ptr SimpleScanTask::Scan() { - return MakeVectorIterator(record_batches_); -} +RecordBatchIterator SimpleScanTask::Scan() { return MakeVectorIterator(record_batches_); } /// \brief GetFragmentsIterator transforms a vector in a flattened /// Iterator. -static std::unique_ptr GetFragmentsIterator( +static DataFragmentIterator GetFragmentsIterator( const std::vector>& sources, std::shared_ptr options) { // Iterator auto sources_it = MakeVectorIterator(sources); // DataSource -> Iterator - auto fn = [options](std::shared_ptr source) - -> std::unique_ptr { return source->GetFragments(options); }; + auto fn = [options](std::shared_ptr source) -> DataFragmentIterator { + return source->GetFragments(options); + }; // Iterator> auto fragments_it = MakeMapIterator(fn, std::move(sources_it)); @@ -49,12 +48,11 @@ static std::unique_ptr GetFragmentsIterator( /// \brief GetScanTaskIterator transforms an Iterator in a /// flattened Iterator. -static std::unique_ptr GetScanTaskIterator( - std::unique_ptr fragments, - std::shared_ptr context) { +static ScanTaskIterator GetScanTaskIterator(DataFragmentIterator fragments, + std::shared_ptr context) { // DataFragment -> ScanTaskIterator auto fn = [context](std::shared_ptr fragment, - std::unique_ptr* out) -> Status { + ScanTaskIterator* out) -> Status { return fragment->Scan(context, out); }; @@ -65,7 +63,7 @@ static std::unique_ptr GetScanTaskIterator( return MakeFlattenIterator(std::move(maybe_scantask_it)); } -std::unique_ptr SimpleScanner::Scan() { +ScanTaskIterator SimpleScanner::Scan() { // First, transforms DataSources in a flat Iterator. This // iterator is lazily constructed, i.e. DataSource::GetFragments is never // invoked. diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 9177a5418d7..023290cc8b8 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -64,7 +64,7 @@ class ARROW_DS_EXPORT ScanTask { /// \brief Iterate through sequence of materialized record batches /// resulting from the Scan. Execution semantics encapsulated in the /// particular ScanTask implementation - virtual std::unique_ptr Scan() = 0; + virtual RecordBatchIterator Scan() = 0; virtual ~ScanTask() = default; }; @@ -75,7 +75,7 @@ class ARROW_DS_EXPORT SimpleScanTask : public ScanTask { explicit SimpleScanTask(std::vector> record_batches) : record_batches_(std::move(record_batches)) {} - std::unique_ptr Scan() override; + RecordBatchIterator Scan() override; protected: std::vector> record_batches_; @@ -95,7 +95,7 @@ class ARROW_DS_EXPORT Scanner { /// \brief The Scan operator returns a stream of ScanTask. The caller is /// responsible to dispatch/schedule said tasks. Tasks should be safe to run /// in a concurrent fashion and outlive the iterator. - virtual std::unique_ptr Scan() = 0; + virtual ScanTaskIterator Scan() = 0; virtual ~Scanner() = default; }; @@ -122,7 +122,7 @@ class ARROW_DS_EXPORT SimpleScanner : public Scanner { options_(std::move(options)), context_(std::move(context)) {} - std::unique_ptr Scan() override; + ScanTaskIterator Scan() override; private: std::vector> sources_; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index c4cbaef0dd7..74b81b90641 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -72,7 +72,7 @@ class DatasetFixtureMixin : public ::testing::Test { void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task, bool ensure_drained = true) { auto it = task->Scan(); - ARROW_EXPECT_OK(it->Visit([expected](std::shared_ptr rhs) -> Status { + ARROW_EXPECT_OK(it.Visit([expected](std::shared_ptr rhs) -> Status { std::shared_ptr lhs; RETURN_NOT_OK(expected->ReadNext(&lhs)); EXPECT_NE(lhs, nullptr); @@ -89,10 +89,10 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, DataFragment* fragment, bool ensure_drained = true) { - std::unique_ptr it; + ScanTaskIterator it; ARROW_EXPECT_OK(fragment->Scan(ctx_, &it)); - ARROW_EXPECT_OK(it->Visit([&](std::unique_ptr task) -> Status { + ARROW_EXPECT_OK(it.Visit([&](std::unique_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); return Status::OK(); })); @@ -108,7 +108,7 @@ class DatasetFixtureMixin : public ::testing::Test { bool ensure_drained = true) { auto it = source->GetFragments(options_); - ARROW_EXPECT_OK(it->Visit([&](std::shared_ptr fragment) -> Status { + ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr fragment) -> Status { AssertFragmentEquals(expected, fragment.get(), false); return Status::OK(); })); @@ -124,7 +124,7 @@ class DatasetFixtureMixin : public ::testing::Test { bool ensure_drained = true) { auto it = scanner->Scan(); - ARROW_EXPECT_OK(it->Visit([&](std::unique_ptr task) -> Status { + ARROW_EXPECT_OK(it.Visit([&](std::unique_ptr task) -> Status { AssertScanTaskEquals(expected, task.get(), false); return Status::OK(); })); @@ -198,7 +198,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { int count = 0; ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -219,7 +219,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { int count = 0; ASSERT_OK( - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); ++count; @@ -242,7 +242,7 @@ class FileSystemBasedDataSourceMixin : public FileSourceFixtureMixin { ASSERT_RAISES( IOError, - source_->GetFragments({})->Visit([&](std::shared_ptr fragment) { + source_->GetFragments({}).Visit([&](std::shared_ptr fragment) { auto file_fragment = internal::checked_pointer_cast(fragment); auto extension = @@ -278,8 +278,8 @@ class DummyFileFormat : public FileFormat { /// \brief Open a file for scanning (always returns an empty iterator) Status ScanFile(const FileSource& source, std::shared_ptr scan_options, std::shared_ptr scan_context, - std::unique_ptr* out) const override { - *out = internal::make_unique>>(); + ScanTaskIterator* out) const override { + *out = MakeEmptyIterator>(); return Status::OK(); } diff --git a/cpp/src/arrow/record_batch.cc b/cpp/src/arrow/record_batch.cc index 1e4ad45a3d2..24d3200df63 100644 --- a/cpp/src/arrow/record_batch.cc +++ b/cpp/src/arrow/record_batch.cc @@ -29,6 +29,7 @@ #include "arrow/table.h" #include "arrow/type.h" #include "arrow/util/atomic_shared_ptr.h" +#include "arrow/util/iterator.h" #include "arrow/util/logging.h" #include "arrow/util/stl.h" @@ -286,7 +287,7 @@ Status RecordBatchReader::ReadAll(std::shared_ptr* table) { class SimpleRecordBatchReader : public RecordBatchReader { public: - SimpleRecordBatchReader(std::unique_ptr>> it, + SimpleRecordBatchReader(Iterator> it, std::shared_ptr schema) : schema_(schema), it_(std::move(it)) {} @@ -295,14 +296,14 @@ class SimpleRecordBatchReader : public RecordBatchReader { : schema_(schema), it_(MakeVectorIterator(batches)) {} Status ReadNext(std::shared_ptr* batch) override { - return it_->Next(batch); + return it_.Next(batch); } std::shared_ptr schema() const override { return schema_; } protected: std::shared_ptr schema_; - std::unique_ptr>> it_; + Iterator> it_; }; Status MakeRecordBatchReader(const std::vector>& batches, diff --git a/cpp/src/arrow/record_batch.h b/cpp/src/arrow/record_batch.h index c3b003edf68..90350bed6fc 100644 --- a/cpp/src/arrow/record_batch.h +++ b/cpp/src/arrow/record_batch.h @@ -22,8 +22,8 @@ #include #include +#include "arrow/status.h" #include "arrow/type_fwd.h" -#include "arrow/util/iterator.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -167,12 +167,10 @@ class ARROW_EXPORT RecordBatch { }; /// \brief Abstract interface for reading stream of record batches -class ARROW_EXPORT RecordBatchReader - /// \cond FALSE - : public RecordBatchIterator -/// \endcond -{ // NOLINT whitespace/braces +class ARROW_EXPORT RecordBatchReader { public: + virtual ~RecordBatchReader() = default; + /// \return the shared schema of the record batches in the stream virtual std::shared_ptr schema() const = 0; @@ -183,7 +181,7 @@ class ARROW_EXPORT RecordBatchReader /// \return Status virtual Status ReadNext(std::shared_ptr* batch) = 0; - Status Next(std::shared_ptr* batch) final { return ReadNext(batch); } + Status Next(std::shared_ptr* batch) { return ReadNext(batch); } /// \brief Consume entire stream as a vector of record batches Status ReadAll(std::vector>* batches); diff --git a/cpp/src/arrow/table.cc b/cpp/src/arrow/table.cc index 41794d389c2..880d0f65cc9 100644 --- a/cpp/src/arrow/table.cc +++ b/cpp/src/arrow/table.cc @@ -555,98 +555,71 @@ Status Table::CombineChunks(MemoryPool* pool, std::shared_ptr
* out) const // ---------------------------------------------------------------------- // Convert a table to a sequence of record batches -class TableBatchReader::TableBatchReaderImpl { - public: - explicit TableBatchReaderImpl(const Table& table) - : table_(table), - column_data_(table.num_columns()), - chunk_numbers_(table.num_columns(), 0), - chunk_offsets_(table.num_columns(), 0), - absolute_row_position_(0), - max_chunksize_(std::numeric_limits::max()) { - for (int i = 0; i < table.num_columns(); ++i) { - column_data_[i] = table.column(i).get(); - } +TableBatchReader::TableBatchReader(const Table& table) + : table_(table), + column_data_(table.num_columns()), + chunk_numbers_(table.num_columns(), 0), + chunk_offsets_(table.num_columns(), 0), + absolute_row_position_(0), + max_chunksize_(std::numeric_limits::max()) { + for (int i = 0; i < table.num_columns(); ++i) { + column_data_[i] = table.column(i).get(); } +} - Status ReadNext(std::shared_ptr* out) { - if (absolute_row_position_ == table_.num_rows()) { - *out = nullptr; - return Status::OK(); - } - - // Determine the minimum contiguous slice across all columns - int64_t chunksize = std::min(table_.num_rows(), max_chunksize_); - std::vector chunks(table_.num_columns()); - for (int i = 0; i < table_.num_columns(); ++i) { - auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get(); - int64_t chunk_remaining = chunk->length() - chunk_offsets_[i]; - - if (chunk_remaining < chunksize) { - chunksize = chunk_remaining; - } - - chunks[i] = chunk; - } - - // Slice chunks and advance chunk index as appropriate - std::vector> batch_data(table_.num_columns()); - - for (int i = 0; i < table_.num_columns(); ++i) { - // Exhausted chunk - const Array* chunk = chunks[i]; - const int64_t offset = chunk_offsets_[i]; - std::shared_ptr slice_data; - if ((chunk->length() - offset) == chunksize) { - ++chunk_numbers_[i]; - chunk_offsets_[i] = 0; - if (offset > 0) { - // Need to slice - slice_data = chunk->Slice(offset, chunksize)->data(); - } else { - // No slice - slice_data = chunk->data(); - } - } else { - chunk_offsets_[i] += chunksize; - slice_data = chunk->Slice(offset, chunksize)->data(); - } - batch_data[i] = std::move(slice_data); - } +std::shared_ptr TableBatchReader::schema() const { return table_.schema(); } - absolute_row_position_ += chunksize; - *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); +void TableBatchReader::set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; } +Status TableBatchReader::ReadNext(std::shared_ptr* out) { + if (absolute_row_position_ == table_.num_rows()) { + *out = nullptr; return Status::OK(); } - std::shared_ptr schema() const { return table_.schema(); } + // Determine the minimum contiguous slice across all columns + int64_t chunksize = std::min(table_.num_rows(), max_chunksize_); + std::vector chunks(table_.num_columns()); + for (int i = 0; i < table_.num_columns(); ++i) { + auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get(); + int64_t chunk_remaining = chunk->length() - chunk_offsets_[i]; - void set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; } - - private: - const Table& table_; - std::vector column_data_; - std::vector chunk_numbers_; - std::vector chunk_offsets_; - int64_t absolute_row_position_; - int64_t max_chunksize_; -}; + if (chunk_remaining < chunksize) { + chunksize = chunk_remaining; + } -TableBatchReader::TableBatchReader(const Table& table) { - impl_.reset(new TableBatchReaderImpl(table)); -} + chunks[i] = chunk; + } -TableBatchReader::~TableBatchReader() {} + // Slice chunks and advance chunk index as appropriate + std::vector> batch_data(table_.num_columns()); -std::shared_ptr TableBatchReader::schema() const { return impl_->schema(); } + for (int i = 0; i < table_.num_columns(); ++i) { + // Exhausted chunk + const Array* chunk = chunks[i]; + const int64_t offset = chunk_offsets_[i]; + std::shared_ptr slice_data; + if ((chunk->length() - offset) == chunksize) { + ++chunk_numbers_[i]; + chunk_offsets_[i] = 0; + if (offset > 0) { + // Need to slice + slice_data = chunk->Slice(offset, chunksize)->data(); + } else { + // No slice + slice_data = chunk->data(); + } + } else { + chunk_offsets_[i] += chunksize; + slice_data = chunk->Slice(offset, chunksize)->data(); + } + batch_data[i] = std::move(slice_data); + } -void TableBatchReader::set_chunksize(int64_t chunksize) { - impl_->set_chunksize(chunksize); -} + absolute_row_position_ += chunksize; + *out = RecordBatch::Make(table_.schema(), chunksize, std::move(batch_data)); -Status TableBatchReader::ReadNext(std::shared_ptr* out) { - return impl_->ReadNext(out); + return Status::OK(); } } // namespace arrow diff --git a/cpp/src/arrow/table.h b/cpp/src/arrow/table.h index 514b02769df..f3ae422b9fb 100644 --- a/cpp/src/arrow/table.h +++ b/cpp/src/arrow/table.h @@ -277,8 +277,6 @@ class ARROW_EXPORT Table { /// of the table's columns. class ARROW_EXPORT TableBatchReader : public RecordBatchReader { public: - ~TableBatchReader() override; - /// \brief Construct a TableBatchReader for the given table explicit TableBatchReader(const Table& table); @@ -293,8 +291,12 @@ class ARROW_EXPORT TableBatchReader : public RecordBatchReader { void set_chunksize(int64_t chunksize); private: - class TableBatchReaderImpl; - std::unique_ptr impl_; + const Table& table_; + std::vector column_data_; + std::vector chunk_numbers_; + std::vector chunk_offsets_; + int64_t absolute_row_position_; + int64_t max_chunksize_; }; /// \brief Construct table from multiple input tables. diff --git a/cpp/src/arrow/util/functional.h b/cpp/src/arrow/util/functional.h index 73542e30021..27ef43c2b72 100644 --- a/cpp/src/arrow/util/functional.h +++ b/cpp/src/arrow/util/functional.h @@ -66,5 +66,16 @@ struct call_traits { using argument_type = decltype(argument_type_impl(&std::decay::type::operator())); }; +template +struct type_constant { + using type = T; +}; + +template +constexpr type_constant>::type> +member_function_argument_type(R (T::*fn)(A...)) { + return {}; +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/iterator.h b/cpp/src/arrow/util/iterator.h index 328b8e946bc..38ad382eb70 100644 --- a/cpp/src/arrow/util/iterator.h +++ b/cpp/src/arrow/util/iterator.h @@ -28,84 +28,152 @@ namespace arrow { +template +class Iterator; + +template +struct IterationTraits { + /// \brief a reserved value which indicates the end of iteration. By + /// default this is NULLPTR since most iterators yield pointer types. + /// Specialize IterationTraits if different end semantics are required. + static T End() { return T(NULLPTR); } +}; + /// \brief A generic Iterator that can return errors template class Iterator { public: - virtual ~Iterator() = default; - - /// \brief Return the next element of the sequence, nullptr when the - /// iteration is completed - virtual Status Next(T* out) = 0; + /// \brief Iterator may be constructed from any type which has a member function + /// with signature Status Next(T*); + /// The argument is moved or copied to the heap and kept in a unique_ptr. Only + /// its destructor and its Next method (which are stored in function pointers) are + /// referenced after construction. + template + explicit Iterator(Wrapped has_next) + : ptr_(new Wrapped(std::move(has_next)), Delete), next_(Next) {} + + Iterator() : ptr_(NULLPTR, NoopDelete) {} + + /// \brief Return the next element of the sequence, IterationTraits::End() when the + /// iteration is completed. Calling this on a default constructed Iterator + /// will result in undefined behavior. + Status Next(T* out) { return next_(ptr_.get(), out); } /// Pass each element of the sequence to a visitor. Will return any error status /// returned by the visitor, terminating iteration. template Status Visit(Visitor&& visitor) { Status status; - T value; - for (;;) { + for (T value, end = IterationTraits::End();;) { status = Next(&value); if (!status.ok()) return status; - if (value == NULLPTR) break; + if (value == end) break; ARROW_RETURN_NOT_OK(visitor(std::move(value))); } return status; } + + /// Iterators will only compare equal if they are both null + bool operator==(const Iterator& other) const { return ptr_ == other.ptr_; } + + explicit operator bool() const { return ptr_ != NULLPTR; } + + private: + /// Implementation of deleter for ptr_: Casts from void* to the wrapped type and + /// deletes that. + template + static void Delete(void* ptr) { + delete static_cast(ptr); + } + + /// Noop delete, used only by the default constructed case where ptr_ is null and + /// nothing must be deleted. + static void NoopDelete(void*) {} + + /// Implementation of Next: Casts from void* to the wrapped type and invokes that + /// type's Next member function. + template + static Status Next(void* ptr, T* out) { + return static_cast(ptr)->Next(out); + } + + /// ptr_ is a unique_ptr to void with a custom deleter: a function pointer which first + /// casts from void* to a pointer to the wrapped type then deletes that. + std::unique_ptr ptr_; + + /// next_ is a function pointer which first casts from void* to a pointer to the wrapped + /// type then invokes its Next member function. + Status (*next_)(void*, T*) = NULLPTR; }; -template -class FunctionIterator : public Iterator { - public: - using IteratorType = Iterator; +template +struct IterationTraits> { + // The end condition for an Iterator of Iterators is a default constructed (null) + // Iterator. + static Iterator End() { return Iterator(); } +}; - explicit FunctionIterator(Fn fn) : fn_(std::move(fn)) {} +template +class PointerIterator { + public: + explicit PointerIterator(Ptr ptr) : ptr_(std::move(ptr)) {} - Status Next(T* out) override { return fn_(out); } + Status Next(T* out) { return ptr_->Next(out); } private: - Fn fn_; + Ptr ptr_; }; -template >::type> -std::unique_ptr> MakeFunctionIterator(Fn fn) { - return std::unique_ptr>( - new FunctionIterator(std::move(fn))); +/// \brief Construct an Iterator which dereferences a (possibly smart) pointer +/// to invoke its Next function +template ())>::type, + typename T = typename std::remove_pointer(&Pointed::Next))::type>::type> +Iterator MakePointerIterator(Ptr ptr) { + return Iterator(PointerIterator(std::move(ptr))); } -template -class EmptyIterator : public Iterator { +template +class FunctionIterator { public: - explicit EmptyIterator(Status s = Status::OK()) : status_(s) {} + explicit FunctionIterator(Fn fn) : fn_(std::move(fn)) {} - Status Next(T* out) override { - *out = NULLPTR; - return status_; - } + Status Next(T* out) { return fn_(out); } private: - Status status_; + Fn fn_; }; +/// \brief Construct an Iterator which invokes a callable on Next() +template >::type> +Iterator MakeFunctionIterator(Fn fn) { + return Iterator(FunctionIterator(std::move(fn))); +} + template -std::unique_ptr> MakeEmptyIterator() { - return std::unique_ptr>(new EmptyIterator()); +Iterator MakeEmptyIterator() { + return MakeFunctionIterator([](T* out) { + *out = IterationTraits::End(); + return Status::OK(); + }); } /// \brief Simple iterator which yields the elements of a std::vector template -class VectorIterator : public Iterator { +class VectorIterator { public: explicit VectorIterator(std::vector v) : elements_(std::move(v)) {} - Status Next(T* out) override { - *out = i_ == elements_.size() ? NULLPTR : std::move(elements_[i_++]); + Status Next(T* out) { + *out = + i_ == elements_.size() ? IterationTraits::End() : std::move(elements_[i_++]); return Status::OK(); } @@ -115,8 +183,8 @@ class VectorIterator : public Iterator { }; template -std::unique_ptr> MakeVectorIterator(std::vector v) { - return std::unique_ptr>(new VectorIterator(std::move(v))); +Iterator MakeVectorIterator(std::vector v) { + return Iterator(VectorIterator(std::move(v))); } /// \brief MapIterator takes ownership of an iterator and a function to apply @@ -125,33 +193,33 @@ template >::type, typename O = typename std::result_of::type> -class MapIterator : public Iterator { +class MapIterator { public: - explicit MapIterator(Fn map, std::unique_ptr> it) + explicit MapIterator(Fn map, Iterator it) : map_(std::move(map)), it_(std::move(it)) {} - Status Next(O* out) override { + Status Next(O* out) { I i; - ARROW_RETURN_NOT_OK(it_->Next(&i)); + ARROW_RETURN_NOT_OK(it_.Next(&i)); // Ensure loops exit. - *out = (i == NULLPTR) ? NULLPTR : map_(std::move(i)); + *out = + i == IterationTraits::End() ? IterationTraits::End() : map_(std::move(i)); return Status::OK(); } private: Fn map_; - std::unique_ptr> it_; + Iterator it_; }; template >::type, typename O = typename std::result_of::type> -std::unique_ptr> MakeMapIterator(Fn map, std::unique_ptr> it) { - return std::unique_ptr>( - new MapIterator(std::move(map), std::move(it))); +Iterator MakeMapIterator(Fn map, Iterator it) { + return Iterator(MapIterator(std::move(map), std::move(it))); } /// \brief Like MapIterator, but where the function can fail. @@ -161,17 +229,16 @@ template < typename std::remove_pointer>::type, typename O = typename std::remove_pointer>::typ> -class MaybeMapIterator : public Iterator { +class MaybeMapIterator { public: - explicit MaybeMapIterator(Fn map, std::unique_ptr> it) - : map_(map), it_(std::move(it)) {} + explicit MaybeMapIterator(Fn map, Iterator it) : map_(map), it_(std::move(it)) {} - Status Next(O* out) override { + Status Next(O* out) { I i; - ARROW_RETURN_NOT_OK(it_->Next(&i)); - if (i == NULLPTR) { - *out = NULLPTR; + ARROW_RETURN_NOT_OK(it_.Next(&i)); + if (i == IterationTraits::End()) { + *out = IterationTraits::End(); return Status::OK(); } @@ -180,7 +247,7 @@ class MaybeMapIterator : public Iterator { private: Fn map_; - std::unique_ptr> it_; + Iterator it_; }; template < @@ -189,38 +256,36 @@ template < typename std::remove_pointer>::type, typename O = typename std::remove_pointer>::type> -std::unique_ptr> MakeMaybeMapIterator(Fn map, - std::unique_ptr> it) { - return std::unique_ptr>( - new MaybeMapIterator(std::move(map), std::move(it))); +Iterator MakeMaybeMapIterator(Fn map, Iterator it) { + return Iterator(MaybeMapIterator(std::move(map), std::move(it))); } /// \brief FlattenIterator takes an iterator generating iterators and yields a /// unified iterator that flattens/concatenates in a single stream. -template >> -class FlattenIterator : public Iterator { +template +class FlattenIterator { public: - explicit FlattenIterator(std::unique_ptr> it) : parent_(std::move(it)) {} + explicit FlattenIterator(Iterator> it) : parent_(std::move(it)) {} - Status Next(T* out) override { + Status Next(T* out) { if (done_) { - *out = NULLPTR; + *out = IterationTraits::End(); return Status::OK(); } - if (child_ == NULLPTR) { + if (child_ == IterationTraits>::End()) { // Pop from parent's iterator. - ARROW_RETURN_NOT_OK(parent_->Next(&child_)); + ARROW_RETURN_NOT_OK(parent_.Next(&child_)); // Check if final iteration reached. - done_ = (child_ == NULLPTR); + done_ = (child_ == IterationTraits>::End()); return Next(out); } // Pop from child_ and lookout for depletion. - ARROW_RETURN_NOT_OK(child_->Next(out)); - if (*out == NULLPTR) { + ARROW_RETURN_NOT_OK(child_.Next(out)); + if (*out == IterationTraits::End()) { // Reset state such that we pop from parent on the recursive call - child_ = NULLPTR; + child_ = IterationTraits>::End(); return Next(out); } @@ -228,17 +293,16 @@ class FlattenIterator : public Iterator { } private: - std::unique_ptr> parent_; - std::unique_ptr> child_ = NULLPTR; + Iterator> parent_; + Iterator child_ = IterationTraits>::End(); // The usage of done_ could be avoided by setting parent_ to null, but this // would hamper debugging. bool done_ = false; }; template -std::unique_ptr> MakeFlattenIterator( - std::unique_ptr>>> it) { - return std::unique_ptr>(new FlattenIterator(std::move(it))); +Iterator MakeFlattenIterator(Iterator> it) { + return Iterator(FlattenIterator(std::move(it))); } } // namespace arrow diff --git a/cpp/src/arrow/util/iterator_test.cc b/cpp/src/arrow/util/iterator_test.cc index 4843d39cea9..15f8044bd1a 100644 --- a/cpp/src/arrow/util/iterator_test.cc +++ b/cpp/src/arrow/util/iterator_test.cc @@ -26,7 +26,7 @@ namespace arrow { template -std::vector IteratorToVector(std::unique_ptr>& iterator) { +std::vector IteratorToVector(Iterator iterator) { std::vector out; auto fn = [&out](T value) -> Status { @@ -34,91 +34,107 @@ std::vector IteratorToVector(std::unique_ptr>& iterator) { return Status::OK(); }; - ARROW_EXPECT_OK(iterator->Visit(fn)); + ARROW_EXPECT_OK(iterator.Visit(fn)); return out; } -#define EmptyIt MakeEmptyIterator -#define VectorIt MakeVectorIterator -#define FlattenIt MakeFlattenIterator +struct TestInt { + TestInt() : value(-999) {} + TestInt(int i) : value(i) {} // NOLINT runtime/explicit + int value; -// Iterators works on pointers -static const int one = 1; -static const int two = 2; -static const int three = 3; + bool operator==(const TestInt& other) const { return value == other.value; } +}; + +template <> +struct IterationTraits { + static TestInt End() { return TestInt(); } +}; template -void AssertIteratorMatch(std::vector expected, std::unique_ptr> actual) { - EXPECT_EQ(expected, IteratorToVector(actual)); +inline Iterator EmptyIt() { + return MakeEmptyIterator(); +} + +inline Iterator VectorIt(std::vector v) { + return MakeVectorIterator(std::move(v)); } template -void AssertIteratorNoMatch(std::vector expected, std::unique_ptr> actual) { - EXPECT_NE(expected, IteratorToVector(actual)); +inline Iterator VectorIt(std::vector v) { + return MakeVectorIterator(std::move(v)); } -TEST(TestEmptyIterator, Basic) { AssertIteratorMatch({}, EmptyIt()); } +template +inline Iterator FlattenIt(Iterator> its) { + return MakeFlattenIterator(std::move(its)); +} -TEST(TestVectorIterator, Basic) { - std::vector input{&one, &two, &three}; +template +void AssertIteratorMatch(std::vector expected, Iterator actual) { + EXPECT_EQ(expected, IteratorToVector(std::move(actual))); +} - AssertIteratorMatch({}, VectorIt(std::vector{})); - AssertIteratorMatch({&one, &two, &three}, VectorIt(input)); +template +void AssertIteratorNoMatch(std::vector expected, Iterator actual) { + EXPECT_NE(expected, IteratorToVector(std::move(actual))); +} - AssertIteratorNoMatch({&one}, VectorIt(std::vector{})); - AssertIteratorNoMatch({}, VectorIt(input)); - AssertIteratorNoMatch({&one, &two, &two}, VectorIt(input)); - AssertIteratorNoMatch({&one, &two, &three, &one}, VectorIt(input)); +TEST(TestEmptyIterator, Basic) { AssertIteratorMatch({}, EmptyIt()); } + +TEST(TestVectorIterator, Basic) { + AssertIteratorMatch({}, VectorIt({})); + AssertIteratorMatch({1, 2, 3}, VectorIt({1, 2, 3})); + + AssertIteratorNoMatch({1}, VectorIt({})); + AssertIteratorNoMatch({}, VectorIt({1, 2, 3})); + AssertIteratorNoMatch({1, 2, 2}, VectorIt({1, 2, 3})); + AssertIteratorNoMatch({1, 2, 3, 1}, VectorIt({1, 2, 3})); } TEST(FlattenVectorIterator, Basic) { - std::vector input{&one, &two, &three}; - - // Flatten expects to consume Iterator>> - AssertIteratorMatch({}, FlattenIt(EmptyIt>>())); - - // unique_ptr and initializer lists is a nono - std::vector>> ok{3}; - ok[0] = VectorIt(std::vector{&one}); - ok[1] = VectorIt(std::vector{&two}); - ok[2] = VectorIt(std::vector{&three}); - AssertIteratorMatch(input, FlattenIt(VectorIt(std::move(ok)))); - - std::vector>> not_enough{2}; - not_enough[0] = VectorIt(std::vector{&one}); - not_enough[1] = VectorIt(std::vector{&two}); - AssertIteratorNoMatch(input, FlattenIt(VectorIt(std::move(not_enough)))); - - std::vector>> too_much{4}; - too_much[0] = VectorIt(std::vector{&one}); - too_much[1] = VectorIt(std::vector{&two}); - too_much[2] = VectorIt(std::vector{&three}); - too_much[3] = VectorIt(std::vector{&two}); - AssertIteratorNoMatch(input, FlattenIt(VectorIt(std::move(too_much)))); + // Flatten expects to consume Iterator> + AssertIteratorMatch({}, FlattenIt(EmptyIt>())); + + std::vector> ok; + ok.push_back(VectorIt({1})); + ok.push_back(VectorIt({2})); + ok.push_back(VectorIt({3})); + AssertIteratorMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(ok)))); + + std::vector> not_enough; + not_enough.push_back(VectorIt({1})); + not_enough.push_back(VectorIt({2})); + AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(not_enough)))); + + std::vector> too_much; + too_much.push_back(VectorIt({1})); + too_much.push_back(VectorIt({2})); + too_much.push_back(VectorIt({3})); + too_much.push_back(VectorIt({2})); + AssertIteratorNoMatch({1, 2, 3}, FlattenIt(VectorIt(std::move(too_much)))); } -template -std::unique_ptr> Join(T a, T b) { - std::vector>> joined{2}; - joined[0] = VectorIt(std::vector{a}); - joined[1] = VectorIt(std::vector{b}); +Iterator Join(TestInt a, TestInt b) { + std::vector> joined{2}; + joined[0] = VectorIt({a}); + joined[1] = VectorIt({b}); return FlattenIt(VectorIt(std::move(joined))); } -template -std::unique_ptr> Join(T a, std::unique_ptr> b) { - std::vector>> joined{2}; - joined[0] = VectorIt(std::vector{a}); +Iterator Join(TestInt a, Iterator b) { + std::vector> joined{2}; + joined[0] = VectorIt(std::vector{a}); joined[1] = std::move(b); return FlattenIt(VectorIt(std::move(joined))); } TEST(FlattenVectorIterator, Pyramid) { - auto it = Join(&one, Join(&two, Join(&two, Join(&three, Join(&three, &three))))); - AssertIteratorMatch({&one, &two, &two, &three, &three, &three}, std::move(it)); + auto it = Join(1, Join(2, Join(2, Join(3, Join(3, 3))))); + AssertIteratorMatch({1, 2, 2, 3, 3, 3}, std::move(it)); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc index 1116a1b70d1..adb7e175e5a 100644 --- a/cpp/src/parquet/arrow/reader.cc +++ b/cpp/src/parquet/arrow/reader.cc @@ -352,30 +352,6 @@ class ColumnChunkReaderImpl : public ColumnChunkReader { int row_group_index_; }; -struct RowGroupReader::Iterator : ::arrow::TableBatchReader { - explicit Iterator(const std::shared_ptr
& table) - : TableBatchReader(*table), table_(table) {} - // TableBatchReader does not take ownership of table - std::shared_ptr
table_; -}; - -Status RowGroupReader::MakeIterator( - std::unique_ptr<::arrow::RecordBatchIterator>* batches) { - std::shared_ptr<::arrow::Table> table; - RETURN_NOT_OK(ReadTable(&table)); - batches->reset(new Iterator(table)); - return Status::OK(); -} - -Status RowGroupReader::MakeIterator( - const std::vector& column_indices, - std::unique_ptr<::arrow::RecordBatchIterator>* batches) { - std::shared_ptr<::arrow::Table> table; - RETURN_NOT_OK(ReadTable(column_indices, &table)); - batches->reset(new Iterator(table)); - return Status::OK(); -} - class RowGroupReaderImpl : public RowGroupReader { public: RowGroupReaderImpl(FileReaderImpl* impl, int row_group_index) diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h index ca8ff1c36e5..17ebf925bb9 100644 --- a/cpp/src/parquet/arrow/reader.h +++ b/cpp/src/parquet/arrow/reader.h @@ -22,7 +22,6 @@ #include #include -#include "arrow/util/iterator.h" #include "parquet/file_reader.h" #include "parquet/platform.h" #include "parquet/properties.h" @@ -232,11 +231,6 @@ class RowGroupReader { std::shared_ptr<::arrow::Table>* out) = 0; virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0; - ::arrow::Status MakeIterator(std::unique_ptr<::arrow::RecordBatchIterator>* batches); - - ::arrow::Status MakeIterator(const std::vector& column_indices, - std::unique_ptr<::arrow::RecordBatchIterator>* batches); - private: struct Iterator; };